1# -*- coding: utf-8 -*-
2"""
3Miscellaneous utility code for VAR estimation
4"""
5from statsmodels.compat.pandas import frequencies
6from statsmodels.compat.python import asbytes
7
8import numpy as np
9import pandas as pd
10from scipy import stats, linalg
11
12import statsmodels.tsa.tsatools as tsa
13
14#-------------------------------------------------------------------------------
15# Auxiliary functions for estimation
16
17def get_var_endog(y, lags, trend='c', has_constant='skip'):
18    """
19    Make predictor matrix for VAR(p) process
20
21    Z := (Z_0, ..., Z_T).T (T x Kp)
22    Z_t = [1 y_t y_{t-1} ... y_{t - p + 1}] (Kp x 1)
23
24    Ref: Lütkepohl p.70 (transposed)
25
26    has_constant can be 'raise', 'add', or 'skip'. See add_constant.
27    """
28    nobs = len(y)
29    # Ravel C order, need to put in descending order
30    Z = np.array([y[t-lags : t][::-1].ravel() for t in range(lags, nobs)])
31
32    # Add constant, trend, etc.
33    trend = tsa.rename_trend(trend)
34    if trend != 'n':
35        Z = tsa.add_trend(Z, prepend=True, trend=trend,
36                          has_constant=has_constant)
37
38    return Z
39
40
41def get_trendorder(trend='c'):
42    # Handle constant, etc.
43    if trend == 'c':
44        trendorder = 1
45    elif trend in ('n', 'nc'):
46        trendorder = 0
47    elif trend == 'ct':
48        trendorder = 2
49    elif trend == 'ctt':
50        trendorder = 3
51    else:
52        raise ValueError(f"Unkown trend: {trend}")
53    return trendorder
54
55
56def make_lag_names(names, lag_order, trendorder=1, exog=None):
57    """
58    Produce list of lag-variable names. Constant / trends go at the beginning
59
60    Examples
61    --------
62    >>> make_lag_names(['foo', 'bar'], 2, 1)
63    ['const', 'L1.foo', 'L1.bar', 'L2.foo', 'L2.bar']
64    """
65    lag_names = []
66    if isinstance(names, str):
67        names = [names]
68
69    # take care of lagged endogenous names
70    for i in range(1, lag_order + 1):
71        for name in names:
72            if not isinstance(name, str):
73                name = str(name) # will need consistent unicode handling
74            lag_names.append('L'+str(i)+'.'+name)
75
76    # handle the constant name
77    if trendorder != 0:
78        lag_names.insert(0, 'const')
79    if trendorder > 1:
80        lag_names.insert(1, 'trend')
81    if trendorder > 2:
82        lag_names.insert(2, 'trend**2')
83    if exog is not None:
84        if isinstance(exog, pd.Series):
85            exog = pd.DataFrame(exog)
86        elif not hasattr(exog, 'ndim'):
87            exog = np.asarray(exog)
88        if exog.ndim == 1:
89            exog = exog[:, None]
90        for i in range(exog.shape[1]):
91            if isinstance(exog, pd.DataFrame):
92                exog_name = str(exog.columns[i])
93            else:
94                exog_name = "exog" + str(i)
95            lag_names.insert(trendorder + i, exog_name)
96    return lag_names
97
98
99def comp_matrix(coefs):
100    """
101    Return compansion matrix for the VAR(1) representation for a VAR(p) process
102    (companion form)
103
104    A = [A_1 A_2 ... A_p-1 A_p
105         I_K 0       0     0
106         0   I_K ... 0     0
107         0 ...       I_K   0]
108    """
109    p, k1, k2 = coefs.shape
110    if k1 != k2:
111        raise ValueError('coefs must be 3-d with shape (p, k, k).')
112
113    kp = k1 * p
114
115    result = np.zeros((kp, kp))
116    result[:k1] = np.concatenate(coefs, axis=1)
117
118    # Set I_K matrices
119    if p > 1:
120        result[np.arange(k1, kp), np.arange(kp-k1)] = 1
121
122    return result
123
124#-------------------------------------------------------------------------------
125# Miscellaneous stuff
126
127
128def parse_lutkepohl_data(path): # pragma: no cover
129    """
130    Parse data files from Lütkepohl (2005) book
131
132    Source for data files: www.jmulti.de
133    """
134
135    from collections import deque
136    from datetime import datetime
137    import re
138
139    regex = re.compile(asbytes(r'<(.*) (\w)([\d]+)>.*'))
140    with open(path, 'rb') as f:
141        lines = deque(f)
142
143    to_skip = 0
144    while asbytes('*/') not in lines.popleft():
145        #while '*/' not in lines.popleft():
146        to_skip += 1
147
148    while True:
149        to_skip += 1
150        line = lines.popleft()
151        m = regex.match(line)
152        if m:
153            year, freq, start_point = m.groups()
154            break
155
156    data = (pd.read_csv(path, delimiter=r"\s+", header=to_skip+1)
157            .to_records(index=False))
158
159    n = len(data)
160
161    # generate the corresponding date range (using pandas for now)
162    start_point = int(start_point)
163    year = int(year)
164
165    offsets = {
166        asbytes('Q'): frequencies.BQuarterEnd(),
167        asbytes('M'): frequencies.BMonthEnd(),
168        asbytes('A'): frequencies.BYearEnd()
169    }
170
171    # create an instance
172    offset = offsets[freq]
173
174    inc = offset * (start_point - 1)
175    start_date = offset.rollforward(datetime(year, 1, 1)) + inc
176
177    offset = offsets[freq]
178    date_range = pd.date_range(start=start_date, freq=offset, periods=n)
179
180    return data, date_range
181
182
183def norm_signif_level(alpha=0.05):
184    return stats.norm.ppf(1 - alpha / 2)
185
186
187def acf_to_acorr(acf):
188    diag = np.diag(acf[0])
189    # numpy broadcasting sufficient
190    return acf / np.sqrt(np.outer(diag, diag))
191
192
193def varsim(coefs, intercept, sig_u, steps=100, initvalues=None, seed=None):
194    """
195    Simulate VAR(p) process, given coefficients and assuming Gaussian noise
196
197    Parameters
198    ----------
199    coefs : ndarray
200        Coefficients for the VAR lags of endog.
201    intercept : None or ndarray 1-D (neqs,) or (steps, neqs)
202        This can be either the intercept for each equation or an offset.
203        If None, then the VAR process has a zero intercept.
204        If intercept is 1-D, then the same (endog specific) intercept is added
205        to all observations.
206        If intercept is 2-D, then it is treated as an offset and is added as
207        an observation specific intercept to the autoregression. In this case,
208        the intercept/offset should have same number of rows as steps, and the
209        same number of columns as endogenous variables (neqs).
210    sig_u : ndarray
211        Covariance matrix of the residuals or innovations.
212        If sig_u is None, then an identity matrix is used.
213    steps : {None, int}
214        number of observations to simulate, this includes the initial
215        observations to start the autoregressive process.
216        If offset is not None, then exog of the model are used if they were
217        provided in the model
218    seed : {None, int}
219        If seed is not None, then it will be used with for the random
220        variables generated by numpy.random.
221
222    Returns
223    -------
224    endog_simulated : nd_array
225        Endog of the simulated VAR process
226    """
227    rs = np.random.RandomState(seed=seed)
228    rmvnorm = rs.multivariate_normal
229    p, k, k = coefs.shape
230    if sig_u is None:
231        sig_u = np.eye(k)
232    ugen = rmvnorm(np.zeros(len(sig_u)), sig_u, steps)
233    result = np.zeros((steps, k))
234    if intercept is not None:
235        # intercept can be 2-D like an offset variable
236        if np.ndim(intercept) > 1:
237            if not len(intercept) == len(ugen):
238                raise ValueError('2-D intercept needs to have length `steps`')
239        # add intercept/offset also to intial values
240        result += intercept
241        result[p:] += ugen[p:]
242    else:
243        result[p:] = ugen[p:]
244
245    # add in AR terms
246    for t in range(p, steps):
247        ygen = result[t]
248        for j in range(p):
249            ygen += np.dot(coefs[j], result[t-j-1])
250
251    return result
252
253
254def get_index(lst, name):
255    try:
256        result = lst.index(name)
257    except Exception:
258        if not isinstance(name, int):
259            raise
260        result = name
261    return result
262
263
264#method used repeatedly in Sims-Zha error bands
265def eigval_decomp(sym_array):
266    """
267    Returns
268    -------
269    W: array of eigenvectors
270    eigva: list of eigenvalues
271    k: largest eigenvector
272    """
273    #check if symmetric, do not include shock period
274    eigva, W = linalg.eig(sym_array, left=True, right=False)
275    k = np.argmax(eigva)
276    return W, eigva, k
277
278
279def vech(A):
280    """
281    Simple vech operator
282    Returns
283    -------
284    vechvec: vector of all elements on and below diagonal
285    """
286
287    length=A.shape[1]
288    vechvec=[]
289    for i in range(length):
290        b=i
291        while b < length:
292            vechvec.append(A[b,i])
293            b=b+1
294    vechvec=np.asarray(vechvec)
295    return vechvec
296
297
298def seasonal_dummies(n_seasons, len_endog, first_period=0, centered=False):
299    """
300
301    Parameters
302    ----------
303    n_seasons : int >= 0
304        Number of seasons (e.g. 12 for monthly data and 4 for quarterly data).
305    len_endog : int >= 0
306        Total number of observations.
307    first_period : int, default: 0
308        Season of the first observation. As an example, suppose we have monthly
309        data and the first observation is in March (third month of the year).
310        In this case we pass 2 as first_period. (0 for the first season,
311        1 for the second, ..., n_seasons-1 for the last season).
312        An integer greater than n_seasons-1 are treated in the same way as the
313        integer modulo n_seasons.
314    centered : bool, default: False
315        If True, center (demean) the dummy variables. That is useful in order
316        to get seasonal dummies that are orthogonal to the vector of constant
317        dummy variables (a vector of ones).
318
319    Returns
320    -------
321    seasonal_dummies : ndarray (len_endog x n_seasons-1)
322    """
323    if n_seasons == 0:
324        return np.empty((len_endog, 0))
325    if n_seasons > 0:
326        season_exog = np.zeros((len_endog, n_seasons - 1))
327        for i in range(n_seasons - 1):
328            season_exog[(i-first_period) % n_seasons::n_seasons, i] = 1
329
330        if centered:
331            season_exog -= 1 / n_seasons
332        return season_exog
333