1# -*- coding: utf-8 -*- 2from statsmodels.compat.platform import PLATFORM_OSX 3 4from statsmodels.regression.process_regression import ( 5 ProcessMLE, GaussianCovariance) 6import numpy as np 7import pandas as pd 8import pytest 9 10import collections 11import statsmodels.tools.numdiff as nd 12from numpy.testing import assert_allclose, assert_equal 13 14 15# Parameters for a test model, with or without additive 16# noise. 17def model1(noise): 18 19 mn_par = np.r_[1, 0, -1, 0] 20 sc_par = np.r_[1, 1] 21 sm_par = np.r_[0.5, 0.1] 22 23 if noise: 24 no_par = np.r_[0.25, 0.25] 25 else: 26 no_par = np.array([]) 27 28 return mn_par, sc_par, sm_par, no_par 29 30 31def setup1(n, get_model, noise): 32 33 mn_par, sc_par, sm_par, no_par = get_model(noise) 34 35 groups = np.kron(np.arange(n // 5), np.ones(5)) 36 time = np.kron(np.ones(n // 5), np.arange(5)) 37 time_z = (time - time.mean()) / time.std() 38 39 x_mean = np.random.normal(size=(n, len(mn_par))) 40 x_sc = np.random.normal(size=(n, len(sc_par))) 41 x_sc[:, 0] = 1 42 x_sc[:, 1] = time_z 43 x_sm = np.random.normal(size=(n, len(sm_par))) 44 x_sm[:, 0] = 1 45 x_sm[:, 1] = time_z 46 47 mn = np.dot(x_mean, mn_par) 48 sc = np.exp(np.dot(x_sc, sc_par)) 49 sm = np.exp(np.dot(x_sm, sm_par)) 50 51 if noise: 52 x_no = np.random.normal(size=(n, len(no_par))) 53 x_no[:, 0] = 1 54 x_no[:, 1] = time_z 55 no = np.exp(np.dot(x_no, no_par)) 56 else: 57 x_no = None 58 59 y = mn.copy() 60 61 gc = GaussianCovariance() 62 63 ix = collections.defaultdict(lambda: []) 64 for i, g in enumerate(groups): 65 ix[g].append(i) 66 67 for g, ii in ix.items(): 68 c = gc.get_cov(time[ii], sc[ii], sm[ii]) 69 r = np.linalg.cholesky(c) 70 y[ii] += np.dot(r, np.random.normal(size=len(ii))) 71 72 # Additive white noise 73 if noise: 74 y += no * np.random.normal(size=y.shape) 75 76 return y, x_mean, x_sc, x_sm, x_no, time, groups 77 78 79def run_arrays(n, get_model, noise): 80 81 y, x_mean, x_sc, x_sm, x_no, time, groups = setup1(n, get_model, noise) 82 83 preg = ProcessMLE(y, x_mean, x_sc, x_sm, x_no, time, groups) 84 85 return preg.fit() 86 87 88@pytest.mark.slow 89@pytest.mark.parametrize("noise", [False, True]) 90def test_arrays(noise): 91 92 np.random.seed(8234) 93 94 f = run_arrays(1000, model1, noise) 95 mod = f.model 96 97 f.summary() # Smoke test 98 99 # Compare the parameter estimates to population values. 100 epar = np.concatenate(model1(noise)) 101 assert_allclose(f.params, epar, atol=0.3, rtol=0.3) 102 103 # Test the fitted covariance matrix 104 cv = f.covariance(mod.time[0:5], mod.exog_scale[0:5, :], 105 mod.exog_smooth[0:5, :]) 106 assert_allclose(cv, cv.T) # Check symmetry 107 a, _ = np.linalg.eig(cv) 108 assert_equal(a > 0, True) # Check PSD 109 110 # Test predict 111 yhat = f.predict() 112 assert_equal(np.corrcoef(yhat, mod.endog)[0, 1] > 0.2, True) 113 yhatm = f.predict(exog=mod.exog) 114 assert_equal(yhat, yhatm) 115 yhat0 = mod.predict(params=f.params, exog=mod.exog) 116 assert_equal(yhat, yhat0) 117 118 # Smoke test t-test 119 f.t_test(np.eye(len(f.params))) 120 121 122def run_formula(n, get_model, noise): 123 124 y, x_mean, x_sc, x_sm, x_no, time, groups = setup1(n, get_model, noise) 125 126 df = pd.DataFrame({ 127 "y": y, 128 "x1": x_mean[:, 0], 129 "x2": x_mean[:, 1], 130 "x3": x_mean[:, 2], 131 "x4": x_mean[:, 3], 132 "xsc1": x_sc[:, 0], 133 "xsc2": x_sc[:, 1], 134 "xsm1": x_sm[:, 0], 135 "xsm2": x_sm[:, 1], 136 "time": time, 137 "groups": groups 138 }) 139 140 if noise: 141 df["xno1"] = x_no[:, 0] 142 df["xno2"] = x_no[:, 1] 143 144 mean_formula = "y ~ 0 + x1 + x2 + x3 + x4" 145 scale_formula = "0 + xsc1 + xsc2" 146 smooth_formula = "0 + xsm1 + xsm2" 147 148 if noise: 149 noise_formula = "0 + xno1 + xno2" 150 else: 151 noise_formula = None 152 153 preg = ProcessMLE.from_formula( 154 mean_formula, 155 data=df, 156 scale_formula=scale_formula, 157 smooth_formula=smooth_formula, 158 noise_formula=noise_formula, 159 time="time", 160 groups="groups") 161 f = preg.fit() 162 163 return f, df 164 165 166@pytest.mark.slow 167@pytest.mark.parametrize("noise", [False, True]) 168def test_formulas(noise): 169 170 np.random.seed(8789) 171 172 f, df = run_formula(1000, model1, noise) 173 mod = f.model 174 175 f.summary() # Smoke test 176 177 # Compare the parameter estimates to population values. 178 epar = np.concatenate(model1(noise)) 179 assert_allclose(f.params, epar, atol=0.1, rtol=1) 180 181 # Test the fitted covariance matrix 182 exog_scale = pd.DataFrame(mod.exog_scale[0:5, :], 183 columns=["xsc1", "xsc2"]) 184 exog_smooth = pd.DataFrame(mod.exog_smooth[0:5, :], 185 columns=["xsm1", "xsm2"]) 186 cv = f.covariance(mod.time[0:5], exog_scale, exog_smooth) 187 assert_allclose(cv, cv.T) 188 a, _ = np.linalg.eig(cv) 189 assert_equal(a > 0, True) 190 191 # Test predict 192 yhat = f.predict() 193 assert_equal(np.corrcoef(yhat, mod.endog)[0, 1] > 0.2, True) 194 yhatm = f.predict(exog=df) 195 assert_equal(yhat, yhatm) 196 yhat0 = mod.predict(params=f.params, exog=df) 197 assert_equal(yhat, yhat0) 198 199 # Smoke test t-test 200 f.t_test(np.eye(len(f.params))) 201 202 203# Test the score functions using numerical derivatives. 204@pytest.mark.parametrize("noise", [False, True]) 205def test_score_numdiff(noise): 206 207 y, x_mean, x_sc, x_sm, x_no, time, groups = setup1(1000, model1, noise) 208 209 preg = ProcessMLE(y, x_mean, x_sc, x_sm, x_no, time, groups) 210 211 def loglike(x): 212 return preg.loglike(x) 213 214 q = x_mean.shape[1] + x_sc.shape[1] + x_sm.shape[1] 215 if noise: 216 q += x_no.shape[1] 217 218 np.random.seed(342) 219 220 atol = 2e-3 if PLATFORM_OSX else 1e-2 221 for _ in range(5): 222 par0 = preg._get_start() 223 par = par0 + 0.1 * np.random.normal(size=q) 224 score = preg.score(par) 225 score_nd = nd.approx_fprime(par, loglike, epsilon=1e-7) 226 assert_allclose(score, score_nd, atol=atol, rtol=1e-4) 227