1# -*- coding: utf-8 -*-
2from statsmodels.compat.platform import PLATFORM_OSX
3
4from statsmodels.regression.process_regression import (
5       ProcessMLE, GaussianCovariance)
6import numpy as np
7import pandas as pd
8import pytest
9
10import collections
11import statsmodels.tools.numdiff as nd
12from numpy.testing import assert_allclose, assert_equal
13
14
15# Parameters for a test model, with or without additive
16# noise.
17def model1(noise):
18
19    mn_par = np.r_[1, 0, -1, 0]
20    sc_par = np.r_[1, 1]
21    sm_par = np.r_[0.5, 0.1]
22
23    if noise:
24        no_par = np.r_[0.25, 0.25]
25    else:
26        no_par = np.array([])
27
28    return mn_par, sc_par, sm_par, no_par
29
30
31def setup1(n, get_model, noise):
32
33    mn_par, sc_par, sm_par, no_par = get_model(noise)
34
35    groups = np.kron(np.arange(n // 5), np.ones(5))
36    time = np.kron(np.ones(n // 5), np.arange(5))
37    time_z = (time - time.mean()) / time.std()
38
39    x_mean = np.random.normal(size=(n, len(mn_par)))
40    x_sc = np.random.normal(size=(n, len(sc_par)))
41    x_sc[:, 0] = 1
42    x_sc[:, 1] = time_z
43    x_sm = np.random.normal(size=(n, len(sm_par)))
44    x_sm[:, 0] = 1
45    x_sm[:, 1] = time_z
46
47    mn = np.dot(x_mean, mn_par)
48    sc = np.exp(np.dot(x_sc, sc_par))
49    sm = np.exp(np.dot(x_sm, sm_par))
50
51    if noise:
52        x_no = np.random.normal(size=(n, len(no_par)))
53        x_no[:, 0] = 1
54        x_no[:, 1] = time_z
55        no = np.exp(np.dot(x_no, no_par))
56    else:
57        x_no = None
58
59    y = mn.copy()
60
61    gc = GaussianCovariance()
62
63    ix = collections.defaultdict(lambda: [])
64    for i, g in enumerate(groups):
65        ix[g].append(i)
66
67    for g, ii in ix.items():
68        c = gc.get_cov(time[ii], sc[ii], sm[ii])
69        r = np.linalg.cholesky(c)
70        y[ii] += np.dot(r, np.random.normal(size=len(ii)))
71
72    # Additive white noise
73    if noise:
74        y += no * np.random.normal(size=y.shape)
75
76    return y, x_mean, x_sc, x_sm, x_no, time, groups
77
78
79def run_arrays(n, get_model, noise):
80
81    y, x_mean, x_sc, x_sm, x_no, time, groups = setup1(n, get_model, noise)
82
83    preg = ProcessMLE(y, x_mean, x_sc, x_sm, x_no, time, groups)
84
85    return preg.fit()
86
87
88@pytest.mark.slow
89@pytest.mark.parametrize("noise", [False, True])
90def test_arrays(noise):
91
92    np.random.seed(8234)
93
94    f = run_arrays(1000, model1, noise)
95    mod = f.model
96
97    f.summary()  # Smoke test
98
99    # Compare the parameter estimates to population values.
100    epar = np.concatenate(model1(noise))
101    assert_allclose(f.params, epar, atol=0.3, rtol=0.3)
102
103    # Test the fitted covariance matrix
104    cv = f.covariance(mod.time[0:5], mod.exog_scale[0:5, :],
105                      mod.exog_smooth[0:5, :])
106    assert_allclose(cv, cv.T)  # Check symmetry
107    a, _ = np.linalg.eig(cv)
108    assert_equal(a > 0, True)  # Check PSD
109
110    # Test predict
111    yhat = f.predict()
112    assert_equal(np.corrcoef(yhat, mod.endog)[0, 1] > 0.2, True)
113    yhatm = f.predict(exog=mod.exog)
114    assert_equal(yhat, yhatm)
115    yhat0 = mod.predict(params=f.params, exog=mod.exog)
116    assert_equal(yhat, yhat0)
117
118    # Smoke test t-test
119    f.t_test(np.eye(len(f.params)))
120
121
122def run_formula(n, get_model, noise):
123
124    y, x_mean, x_sc, x_sm, x_no, time, groups = setup1(n, get_model, noise)
125
126    df = pd.DataFrame({
127        "y": y,
128        "x1": x_mean[:, 0],
129        "x2": x_mean[:, 1],
130        "x3": x_mean[:, 2],
131        "x4": x_mean[:, 3],
132        "xsc1": x_sc[:, 0],
133        "xsc2": x_sc[:, 1],
134        "xsm1": x_sm[:, 0],
135        "xsm2": x_sm[:, 1],
136        "time": time,
137        "groups": groups
138    })
139
140    if noise:
141        df["xno1"] = x_no[:, 0]
142        df["xno2"] = x_no[:, 1]
143
144    mean_formula = "y ~ 0 + x1 + x2 + x3 + x4"
145    scale_formula = "0 + xsc1 + xsc2"
146    smooth_formula = "0 + xsm1 + xsm2"
147
148    if noise:
149        noise_formula = "0 + xno1 + xno2"
150    else:
151        noise_formula = None
152
153    preg = ProcessMLE.from_formula(
154        mean_formula,
155        data=df,
156        scale_formula=scale_formula,
157        smooth_formula=smooth_formula,
158        noise_formula=noise_formula,
159        time="time",
160        groups="groups")
161    f = preg.fit()
162
163    return f, df
164
165
166@pytest.mark.slow
167@pytest.mark.parametrize("noise", [False, True])
168def test_formulas(noise):
169
170    np.random.seed(8789)
171
172    f, df = run_formula(1000, model1, noise)
173    mod = f.model
174
175    f.summary()  # Smoke test
176
177    # Compare the parameter estimates to population values.
178    epar = np.concatenate(model1(noise))
179    assert_allclose(f.params, epar, atol=0.1, rtol=1)
180
181    # Test the fitted covariance matrix
182    exog_scale = pd.DataFrame(mod.exog_scale[0:5, :],
183                              columns=["xsc1", "xsc2"])
184    exog_smooth = pd.DataFrame(mod.exog_smooth[0:5, :],
185                               columns=["xsm1", "xsm2"])
186    cv = f.covariance(mod.time[0:5], exog_scale, exog_smooth)
187    assert_allclose(cv, cv.T)
188    a, _ = np.linalg.eig(cv)
189    assert_equal(a > 0, True)
190
191    # Test predict
192    yhat = f.predict()
193    assert_equal(np.corrcoef(yhat, mod.endog)[0, 1] > 0.2, True)
194    yhatm = f.predict(exog=df)
195    assert_equal(yhat, yhatm)
196    yhat0 = mod.predict(params=f.params, exog=df)
197    assert_equal(yhat, yhat0)
198
199    # Smoke test t-test
200    f.t_test(np.eye(len(f.params)))
201
202
203# Test the score functions using numerical derivatives.
204@pytest.mark.parametrize("noise", [False, True])
205def test_score_numdiff(noise):
206
207    y, x_mean, x_sc, x_sm, x_no, time, groups = setup1(1000, model1, noise)
208
209    preg = ProcessMLE(y, x_mean, x_sc, x_sm, x_no, time, groups)
210
211    def loglike(x):
212        return preg.loglike(x)
213
214    q = x_mean.shape[1] + x_sc.shape[1] + x_sm.shape[1]
215    if noise:
216        q += x_no.shape[1]
217
218    np.random.seed(342)
219
220    atol = 2e-3 if PLATFORM_OSX else 1e-2
221    for _ in range(5):
222        par0 = preg._get_start()
223        par = par0 + 0.1 * np.random.normal(size=q)
224        score = preg.score(par)
225        score_nd = nd.approx_fprime(par, loglike, epsilon=1e-7)
226        assert_allclose(score, score_nd, atol=atol, rtol=1e-4)
227