1# -*- coding: utf-8 -*- 2""" 3Vector Autoregression (VAR) processes 4 5References 6---------- 7Lütkepohl (2005) New Introduction to Multiple Time Series Analysis 8""" 9import numpy as np 10import numpy.linalg as npl 11from numpy.linalg import slogdet 12 13from statsmodels.tsa.tsatools import rename_trend 14from statsmodels.tools.decorators import deprecated_alias 15from statsmodels.tools.numdiff import approx_fprime, approx_hess 16import statsmodels.tsa.base.tsa_model as tsbase 17from statsmodels.tsa.vector_ar.irf import IRAnalysis 18import statsmodels.tsa.vector_ar.util as util 19from statsmodels.tsa.vector_ar.var_model import VARProcess, VARResults 20 21 22def svar_ckerr(svar_type, A, B): 23 if A is None and (svar_type == 'A' or svar_type == 'AB'): 24 raise ValueError('SVAR of type A or AB but A array not given.') 25 if B is None and (svar_type == 'B' or svar_type == 'AB'): 26 27 raise ValueError('SVAR of type B or AB but B array not given.') 28 29 30class SVAR(tsbase.TimeSeriesModel): 31 r""" 32 Fit VAR and then estimate structural components of A and B, defined: 33 34 .. math:: Ay_t = A_1 y_{t-1} + \ldots + A_p y_{t-p} + B\var(\epsilon_t) 35 36 Parameters 37 ---------- 38 endog : array_like 39 1-d endogenous response variable. The independent variable. 40 dates : array_like 41 must match number of rows of endog 42 svar_type : str 43 "A" - estimate structural parameters of A matrix, B assumed = I 44 "B" - estimate structural parameters of B matrix, A assumed = I 45 "AB" - estimate structural parameters indicated in both A and B matrix 46 A : array_like 47 neqs x neqs with unknown parameters marked with 'E' for estimate 48 B : array_like 49 neqs x neqs with unknown parameters marked with 'E' for estimate 50 51 References 52 ---------- 53 Hamilton (1994) Time Series Analysis 54 """ 55 56 y = deprecated_alias("y", "endog", remove_version="0.11.0") 57 58 def __init__(self, endog, svar_type, dates=None, 59 freq=None, A=None, B=None, missing='none'): 60 super().__init__(endog, None, dates, freq, missing=missing) 61 #(self.endog, self.names, 62 # self.dates) = data_util.interpret_data(endog, names, dates) 63 64 self.neqs = self.endog.shape[1] 65 66 types = ['A', 'B', 'AB'] 67 if svar_type not in types: 68 raise ValueError('SVAR type not recognized, must be in ' 69 + str(types)) 70 self.svar_type = svar_type 71 72 svar_ckerr(svar_type, A, B) 73 74 self.A_original = A 75 self.B_original = B 76 77 # initialize A, B as I if not given 78 # Initialize SVAR masks 79 if A is None: 80 A = np.identity(self.neqs) 81 self.A_mask = A_mask = np.zeros(A.shape, dtype=bool) 82 else: 83 A_mask = np.logical_or(A == 'E', A == 'e') 84 self.A_mask = A_mask 85 if B is None: 86 B = np.identity(self.neqs) 87 self.B_mask = B_mask = np.zeros(B.shape, dtype=bool) 88 else: 89 B_mask = np.logical_or(B == 'E', B == 'e') 90 self.B_mask = B_mask 91 92 # convert A and B to numeric 93 #TODO: change this when masked support is better or with formula 94 #integration 95 Anum = np.zeros(A.shape, dtype=float) 96 Anum[~A_mask] = A[~A_mask] 97 Anum[A_mask] = np.nan 98 self.A = Anum 99 100 Bnum = np.zeros(B.shape, dtype=float) 101 Bnum[~B_mask] = B[~B_mask] 102 Bnum[B_mask] = np.nan 103 self.B = Bnum 104 105 #LikelihoodModel.__init__(self, endog) 106 107 #super().__init__(endog) 108 109 def fit(self, A_guess=None, B_guess=None, maxlags=None, method='ols', 110 ic=None, trend='c', verbose=False, s_method='mle', 111 solver="bfgs", override=False, maxiter=500, maxfun=500): 112 """ 113 Fit the SVAR model and solve for structural parameters 114 115 Parameters 116 ---------- 117 A_guess : array_like, optional 118 A vector of starting values for all parameters to be estimated 119 in A. 120 B_guess : array_like, optional 121 A vector of starting values for all parameters to be estimated 122 in B. 123 maxlags : int 124 Maximum number of lags to check for order selection, defaults to 125 12 * (nobs/100.)**(1./4), see select_order function 126 method : {'ols'} 127 Estimation method to use 128 ic : {'aic', 'fpe', 'hqic', 'bic', None} 129 Information criterion to use for VAR order selection. 130 aic : Akaike 131 fpe : Final prediction error 132 hqic : Hannan-Quinn 133 bic : Bayesian a.k.a. Schwarz 134 verbose : bool, default False 135 Print order selection output to the screen 136 trend, str {"c", "ct", "ctt", "n"} 137 "c" - add constant 138 "ct" - constant and trend 139 "ctt" - constant, linear and quadratic trend 140 "n" - co constant, no trend 141 Note that these are prepended to the columns of the dataset. 142 s_method : {'mle'} 143 Estimation method for structural parameters 144 solver : {'nm', 'newton', 'bfgs', 'cg', 'ncg', 'powell'} 145 Solution method 146 See statsmodels.base for details 147 override : bool, default False 148 If True, returns estimates of A and B without checking 149 order or rank condition 150 maxiter : int, default 500 151 Number of iterations to perform in solution method 152 maxfun : int 153 Number of function evaluations to perform 154 155 Notes 156 ----- 157 Lütkepohl pp. 146-153 158 Hamilton pp. 324-336 159 160 Returns 161 ------- 162 est : SVARResults 163 """ 164 lags = maxlags 165 trend = rename_trend(trend) 166 if ic is not None: 167 selections = self.select_order(maxlags=maxlags, verbose=verbose) 168 if ic not in selections: 169 raise ValueError("%s not recognized, must be among %s" 170 % (ic, sorted(selections))) 171 lags = selections[ic] 172 if verbose: 173 print('Using %d based on %s criterion' % (lags, ic)) 174 else: 175 if lags is None: 176 lags = 1 177 178 self.nobs = len(self.endog) - lags 179 180 # initialize starting parameters 181 start_params = self._get_init_params(A_guess, B_guess) 182 183 return self._estimate_svar(start_params, lags, trend=trend, 184 solver=solver, override=override, 185 maxiter=maxiter, maxfun=maxfun) 186 187 def _get_init_params(self, A_guess, B_guess): 188 """ 189 Returns either the given starting or .1 if none are given. 190 """ 191 192 var_type = self.svar_type.lower() 193 194 n_masked_a = self.A_mask.sum() 195 if var_type in ['ab', 'a']: 196 if A_guess is None: 197 A_guess = np.array([.1]*n_masked_a) 198 else: 199 if len(A_guess) != n_masked_a: 200 msg = 'len(A_guess) = %s, there are %s parameters in A' 201 raise ValueError(msg % (len(A_guess), n_masked_a)) 202 else: 203 A_guess = [] 204 205 n_masked_b = self.B_mask.sum() 206 if var_type in ['ab', 'b']: 207 if B_guess is None: 208 B_guess = np.array([.1]*n_masked_b) 209 else: 210 if len(B_guess) != n_masked_b: 211 msg = 'len(B_guess) = %s, there are %s parameters in B' 212 raise ValueError(msg % (len(B_guess), n_masked_b)) 213 else: 214 B_guess = [] 215 216 return np.r_[A_guess, B_guess] 217 218 def _estimate_svar(self, start_params, lags, maxiter, maxfun, 219 trend='c', solver="nm", override=False): 220 """ 221 lags : int 222 trend : {str, None} 223 As per above 224 """ 225 k_trend = util.get_trendorder(trend) 226 y = self.endog 227 z = util.get_var_endog(y, lags, trend=trend, has_constant='raise') 228 y_sample = y[lags:] 229 230 # Lutkepohl p75, about 5x faster than stated formula 231 var_params = np.linalg.lstsq(z, y_sample, rcond=-1)[0] 232 resid = y_sample - np.dot(z, var_params) 233 234 # Unbiased estimate of covariance matrix $\Sigma_u$ of the white noise 235 # process $u$ 236 # equivalent definition 237 # .. math:: \frac{1}{T - Kp - 1} Y^\prime (I_T - Z (Z^\prime Z)^{-1} 238 # Z^\prime) Y 239 # Ref: Lutkepohl p.75 240 # df_resid right now is T - Kp - 1, which is a suggested correction 241 242 avobs = len(y_sample) 243 244 df_resid = avobs - (self.neqs * lags + k_trend) 245 246 sse = np.dot(resid.T, resid) 247 #TODO: should give users the option to use a dof correction or not 248 omega = sse / df_resid 249 self.sigma_u = omega 250 251 A, B = self._solve_AB(start_params, override=override, 252 solver=solver, 253 maxiter=maxiter) 254 A_mask = self.A_mask 255 B_mask = self.B_mask 256 257 return SVARResults(y, z, var_params, omega, lags, 258 names=self.endog_names, trend=trend, 259 dates=self.data.dates, model=self, 260 A=A, B=B, A_mask=A_mask, B_mask=B_mask) 261 262 def loglike(self, params): 263 """ 264 Loglikelihood for SVAR model 265 266 Notes 267 ----- 268 This method assumes that the autoregressive parameters are 269 first estimated, then likelihood with structural parameters 270 is estimated 271 """ 272 273 #TODO: this does not look robust if A or B is None 274 A = self.A 275 B = self.B 276 A_mask = self.A_mask 277 B_mask = self.B_mask 278 A_len = len(A[A_mask]) 279 B_len = len(B[B_mask]) 280 281 if A is not None: 282 A[A_mask] = params[:A_len] 283 if B is not None: 284 B[B_mask] = params[A_len:A_len+B_len] 285 286 nobs = self.nobs 287 neqs = self.neqs 288 sigma_u = self.sigma_u 289 290 W = np.dot(npl.inv(B),A) 291 trc_in = np.dot(np.dot(W.T,W),sigma_u) 292 sign, b_logdet = slogdet(B**2) #numpy 1.4 compat 293 b_slogdet = sign * b_logdet 294 295 likl = -nobs/2. * (neqs * np.log(2 * np.pi) - 296 np.log(npl.det(A)**2) + b_slogdet + 297 np.trace(trc_in)) 298 299 return likl 300 301 def score(self, AB_mask): 302 """ 303 Return the gradient of the loglike at AB_mask. 304 305 Parameters 306 ---------- 307 AB_mask : unknown values of A and B matrix concatenated 308 309 Notes 310 ----- 311 Return numerical gradient 312 """ 313 loglike = self.loglike 314 return approx_fprime(AB_mask, loglike, epsilon=1e-8) 315 316 def hessian(self, AB_mask): 317 """ 318 Returns numerical hessian. 319 """ 320 loglike = self.loglike 321 return approx_hess(AB_mask, loglike) 322 323 def _solve_AB(self, start_params, maxiter, override=False, solver='bfgs'): 324 """ 325 Solves for MLE estimate of structural parameters 326 327 Parameters 328 ---------- 329 330 override : bool, default False 331 If True, returns estimates of A and B without checking 332 order or rank condition 333 solver : str or None, optional 334 Solver to be used. The default is 'nm' (Nelder-Mead). Other 335 choices are 'bfgs', 'newton' (Newton-Raphson), 'cg' 336 conjugate, 'ncg' (non-conjugate gradient), and 'powell'. 337 maxiter : int, optional 338 The maximum number of iterations. Default is 500. 339 340 Returns 341 ------- 342 A_solve, B_solve: ML solutions for A, B matrices 343 """ 344 #TODO: this could stand a refactor 345 A_mask = self.A_mask 346 B_mask = self.B_mask 347 A = self.A 348 B = self.B 349 A_len = len(A[A_mask]) 350 351 A[A_mask] = start_params[:A_len] 352 B[B_mask] = start_params[A_len:] 353 354 if not override: 355 J = self._compute_J(A, B) 356 self.check_order(J) 357 self.check_rank(J) 358 else: #TODO: change to a warning? 359 print("Order/rank conditions have not been checked") 360 361 retvals = super().fit(start_params=start_params, 362 method=solver, maxiter=maxiter, 363 gtol=1e-20, disp=False).params 364 365 A[A_mask] = retvals[:A_len] 366 B[B_mask] = retvals[A_len:] 367 368 return A, B 369 370 def _compute_J(self, A_solve, B_solve): 371 372 #first compute appropriate duplication matrix 373 # taken from Magnus and Neudecker (1980), 374 #"The Elimination Matrix: Some Lemmas and Applications 375 # the creation of the D_n matrix follows MN (1980) directly, 376 #while the rest follows Hamilton (1994) 377 378 neqs = self.neqs 379 sigma_u = self.sigma_u 380 A_mask = self.A_mask 381 B_mask = self.B_mask 382 383 #first generate duplication matrix, see MN (1980) for notation 384 385 D_nT = np.zeros([int((1.0 / 2) * (neqs) * (neqs + 1)), neqs**2]) 386 387 for j in range(neqs): 388 i=j 389 while j <= i < neqs: 390 u=np.zeros([int((1.0/2)*neqs*(neqs+1)), 1]) 391 u[int(j * neqs + (i + 1) - (1.0 / 2) * (j + 1) * j - 1)] = 1 392 Tij=np.zeros([neqs,neqs]) 393 Tij[i,j]=1 394 Tij[j,i]=1 395 D_nT=D_nT+np.dot(u,(Tij.ravel('F')[:,None]).T) 396 i=i+1 397 398 D_n=D_nT.T 399 D_pl=npl.pinv(D_n) 400 401 #generate S_B 402 S_B = np.zeros((neqs**2, len(A_solve[A_mask]))) 403 S_D = np.zeros((neqs**2, len(B_solve[B_mask]))) 404 405 j = 0 406 j_d = 0 407 if len(A_solve[A_mask]) != 0: 408 A_vec = np.ravel(A_mask, order='F') 409 for k in range(neqs**2): 410 if A_vec[k]: 411 S_B[k,j] = -1 412 j += 1 413 if len(B_solve[B_mask]) != 0: 414 B_vec = np.ravel(B_mask, order='F') 415 for k in range(neqs**2): 416 if B_vec[k]: 417 S_D[k,j_d] = 1 418 j_d +=1 419 420 #now compute J 421 invA = npl.inv(A_solve) 422 J_p1i = np.dot(np.dot(D_pl, np.kron(sigma_u, invA)), S_B) 423 J_p1 = -2.0 * J_p1i 424 J_p2 = np.dot(np.dot(D_pl, np.kron(invA, invA)), S_D) 425 426 J = np.append(J_p1, J_p2, axis=1) 427 428 return J 429 430 def check_order(self, J): 431 if np.size(J, axis=0) < np.size(J, axis=1): 432 raise ValueError("Order condition not met: " 433 "solution may not be unique") 434 435 def check_rank(self, J): 436 rank = np.linalg.matrix_rank(J) 437 if rank < np.size(J, axis=1): 438 raise ValueError("Rank condition not met: " 439 "solution may not be unique.") 440 441 442class SVARProcess(VARProcess): 443 """ 444 Class represents a known SVAR(p) process 445 446 Parameters 447 ---------- 448 coefs : ndarray (p x k x k) 449 intercept : ndarray (length k) 450 sigma_u : ndarray (k x k) 451 names : sequence (length k) 452 A : neqs x neqs np.ndarray with unknown parameters marked with 'E' 453 A_mask : neqs x neqs mask array with known parameters masked 454 B : neqs x neqs np.ndarry with unknown parameters marked with 'E' 455 B_mask : neqs x neqs mask array with known parameters masked 456 """ 457 def __init__(self, coefs, intercept, sigma_u, A_solve, B_solve, 458 names=None): 459 self.k_ar = len(coefs) 460 self.neqs = coefs.shape[1] 461 self.coefs = coefs 462 self.intercept = intercept 463 self.sigma_u = sigma_u 464 self.A_solve = A_solve 465 self.B_solve = B_solve 466 self.names = names 467 468 def orth_ma_rep(self, maxn=10, P=None): 469 """ 470 471 Unavailable for SVAR 472 """ 473 raise NotImplementedError 474 475 def svar_ma_rep(self, maxn=10, P=None): 476 """ 477 478 Compute Structural MA coefficient matrices using MLE 479 of A, B 480 """ 481 if P is None: 482 A_solve = self.A_solve 483 B_solve = self.B_solve 484 P = np.dot(npl.inv(A_solve), B_solve) 485 486 ma_mats = self.ma_rep(maxn=maxn) 487 return np.array([np.dot(coefs, P) for coefs in ma_mats]) 488 489 490class SVARResults(SVARProcess, VARResults): 491 """ 492 Estimate VAR(p) process with fixed number of lags 493 494 Parameters 495 ---------- 496 endog : ndarray 497 endog_lagged : ndarray 498 params : ndarray 499 sigma_u : ndarray 500 lag_order : int 501 model : VAR model instance 502 trend : str {'n', 'c', 'ct'} 503 names : array_like 504 List of names of the endogenous variables in order of appearance in `endog`. 505 dates 506 507 Attributes 508 ---------- 509 aic 510 bic 511 bse 512 coefs : ndarray (p x K x K) 513 Estimated A_i matrices, A_i = coefs[i-1] 514 cov_params 515 dates 516 detomega 517 df_model : int 518 df_resid : int 519 endog 520 endog_lagged 521 fittedvalues 522 fpe 523 intercept 524 info_criteria 525 k_ar : int 526 k_trend : int 527 llf 528 model 529 names 530 neqs : int 531 Number of variables (equations) 532 nobs : int 533 n_totobs : int 534 params 535 k_ar : int 536 Order of VAR process 537 params : ndarray (Kp + 1) x K 538 A_i matrices and intercept in stacked form [int A_1 ... A_p] 539 pvalue 540 names : list 541 variables names 542 resid 543 sigma_u : ndarray (K x K) 544 Estimate of white noise process variance Var[u_t] 545 sigma_u_mle 546 stderr 547 trenorder 548 tvalues 549 """ 550 551 _model_type = 'SVAR' 552 553 def __init__(self, endog, endog_lagged, params, sigma_u, lag_order, 554 A=None, B=None, A_mask=None, B_mask=None, model=None, 555 trend='c', names=None, dates=None): 556 557 self.model = model 558 self.endog = endog 559 self.endog_lagged = endog_lagged 560 self.dates = dates 561 562 self.n_totobs, self.neqs = self.endog.shape 563 self.nobs = self.n_totobs - lag_order 564 k_trend = util.get_trendorder(trend) 565 if k_trend > 0: # make this the polynomial trend order 566 trendorder = k_trend - 1 567 else: 568 trendorder = None 569 self.k_trend = k_trend 570 self.k_exog = k_trend # now (0.9) required by VARProcess 571 self.trendorder = trendorder 572 573 self.exog_names = util.make_lag_names(names, lag_order, k_trend) 574 self.params = params 575 self.sigma_u = sigma_u 576 577 # Each matrix needs to be transposed 578 reshaped = self.params[self.k_trend:] 579 reshaped = reshaped.reshape((lag_order, self.neqs, self.neqs)) 580 581 # Need to transpose each coefficient matrix 582 intercept = self.params[0] 583 coefs = reshaped.swapaxes(1, 2).copy() 584 585 #SVAR components 586 #TODO: if you define these here, you do not also have to define 587 #them in SVAR process, but I left them for now -ss 588 self.A = A 589 self.B = B 590 self.A_mask = A_mask 591 self.B_mask = B_mask 592 593 super().__init__(coefs, intercept, sigma_u, A, B, 594 names=names) 595 596 def irf(self, periods=10, var_order=None): 597 """ 598 Analyze structural impulse responses to shocks in system 599 600 Parameters 601 ---------- 602 periods : int 603 604 Returns 605 ------- 606 irf : IRAnalysis 607 """ 608 A = self.A 609 B= self.B 610 P = np.dot(npl.inv(A), B) 611 612 return IRAnalysis(self, P=P, periods=periods, svar=True) 613 614 def sirf_errband_mc(self, orth=False, repl=1000, steps=10, 615 signif=0.05, seed=None, burn=100, cum=False): 616 """ 617 Compute Monte Carlo integrated error bands assuming normally 618 distributed for impulse response functions 619 620 Parameters 621 ---------- 622 orth : bool, default False 623 Compute orthogonalized impulse response error bands 624 repl : int 625 number of Monte Carlo replications to perform 626 steps : int, default 10 627 number of impulse response periods 628 signif : float (0 < signif <1) 629 Significance level for error bars, defaults to 95% CI 630 seed : int 631 np.random.seed for replications 632 burn : int 633 number of initial observations to discard for simulation 634 cum : bool, default False 635 produce cumulative irf error bands 636 637 Notes 638 ----- 639 Lütkepohl (2005) Appendix D 640 641 Returns 642 ------- 643 Tuple of lower and upper arrays of ma_rep monte carlo standard errors 644 """ 645 neqs = self.neqs 646 mean = self.mean() 647 k_ar = self.k_ar 648 coefs = self.coefs 649 sigma_u = self.sigma_u 650 intercept = self.intercept 651 df_model = self.df_model 652 nobs = self.nobs 653 654 ma_coll = np.zeros((repl, steps + 1, neqs, neqs)) 655 A = self.A 656 B = self.B 657 A_mask = self.A_mask 658 B_mask = self.B_mask 659 A_pass = self.model.A_original 660 B_pass = self.model.B_original 661 s_type = self.model.svar_type 662 663 g_list = [] 664 665 def agg(impulses): 666 if cum: 667 return impulses.cumsum(axis=0) 668 return impulses 669 670 opt_A = A[A_mask] 671 opt_B = B[B_mask] 672 for i in range(repl): 673 # discard first hundred to correct for starting bias 674 sim = util.varsim(coefs, intercept, sigma_u, seed=seed, 675 steps=nobs + burn) 676 sim = sim[burn:] 677 678 smod = SVAR(sim, svar_type=s_type, A=A_pass, B=B_pass) 679 if i == 10: 680 # Use first 10 to update starting val for remainder of fits 681 mean_AB = np.mean(g_list, axis=0) 682 split = len(A[A_mask]) 683 opt_A = mean_AB[:split] 684 opt_B = mean_AB[split:] 685 686 sres = smod.fit(maxlags=k_ar, A_guess=opt_A, B_guess=opt_B) 687 688 if i < 10: 689 # save estimates for starting val if in first 10 690 g_list.append(np.append(sres.A[A_mask].tolist(), 691 sres.B[B_mask].tolist())) 692 ma_coll[i] = agg(sres.svar_ma_rep(maxn=steps)) 693 694 ma_sort = np.sort(ma_coll, axis=0) # sort to get quantiles 695 index = (int(round(signif / 2 * repl) - 1), 696 int(round((1 - signif / 2) * repl) - 1)) 697 lower = ma_sort[index[0], :, :, :] 698 upper = ma_sort[index[1], :, :, :] 699 return lower, upper 700