1import itertools
2import os
3
4import numpy as np
5from statsmodels.duration.hazard_regression import PHReg
6from numpy.testing import (assert_allclose,
7                           assert_equal, assert_)
8import pandas as pd
9import pytest
10
11# TODO: Include some corner cases: data sets with empty strata, strata
12#      with no events, entry times after censoring times, etc.
13
14# All the R results
15from .results import survival_r_results
16from .results import survival_enet_r_results
17
18"""
19Tests of PHReg against R coxph.
20
21Tests include entry times and stratification.
22
23phreg_gentests.py generates the test data sets and puts them into the
24results folder.
25
26survival.R runs R on all the test data sets and constructs the
27survival_r_results module.
28"""
29
30# Arguments passed to the PHReg fit method.
31args = {"method": "bfgs", "disp": 0}
32
33
34def get_results(n, p, ext, ties):
35    if ext is None:
36        coef_name = "coef_%d_%d_%s" % (n, p, ties)
37        se_name = "se_%d_%d_%s" % (n, p, ties)
38        time_name = "time_%d_%d_%s" % (n, p, ties)
39        hazard_name = "hazard_%d_%d_%s" % (n, p, ties)
40    else:
41        coef_name = "coef_%d_%d_%s_%s" % (n, p, ext, ties)
42        se_name = "se_%d_%d_%s_%s" % (n, p, ext, ties)
43        time_name = "time_%d_%d_%s_%s" % (n, p, ext, ties)
44        hazard_name = "hazard_%d_%d_%s_%s" % (n, p, ext, ties)
45    coef = getattr(survival_r_results, coef_name)
46    se = getattr(survival_r_results, se_name)
47    time = getattr(survival_r_results, time_name)
48    hazard = getattr(survival_r_results, hazard_name)
49    return coef, se, time, hazard
50
51class TestPHReg(object):
52
53    # Load a data file from the results directory
54    @staticmethod
55    def load_file(fname):
56        cur_dir = os.path.dirname(os.path.abspath(__file__))
57        data = np.genfromtxt(os.path.join(cur_dir, 'results', fname),
58                             delimiter=" ")
59        time = data[:,0]
60        status = data[:,1]
61        entry = data[:,2]
62        exog = data[:,3:]
63
64        return time, status, entry, exog
65
66    # Run a single test against R output
67    @staticmethod
68    def do1(fname, ties, entry_f, strata_f):
69
70        # Read the test data.
71        time, status, entry, exog = TestPHReg.load_file(fname)
72        n = len(time)
73
74        vs = fname.split("_")
75        n = int(vs[2])
76        p = int(vs[3].split(".")[0])
77        ties1 = ties[0:3]
78
79        # Needs to match the kronecker statement in survival.R
80        strata = np.kron(range(5), np.ones(n // 5))
81
82        # No stratification or entry times
83        mod = PHReg(time, exog, status, ties=ties)
84        phrb = mod.fit(**args)
85        coef_r, se_r, time_r, hazard_r = get_results(n, p, None, ties1)
86        assert_allclose(phrb.params, coef_r, rtol=1e-3)
87        assert_allclose(phrb.bse, se_r, rtol=1e-4)
88        time_h, cumhaz, surv = phrb.baseline_cumulative_hazard[0]
89
90        # Entry times but no stratification
91        phrb = PHReg(time, exog, status, entry=entry,
92                     ties=ties).fit(**args)
93        coef, se, time_r, hazard_r = get_results(n, p, "et", ties1)
94        assert_allclose(phrb.params, coef, rtol=1e-3)
95        assert_allclose(phrb.bse, se, rtol=1e-3)
96
97        # Stratification but no entry times
98        phrb = PHReg(time, exog, status, strata=strata,
99                      ties=ties).fit(**args)
100        coef, se, time_r, hazard_r = get_results(n, p, "st", ties1)
101        assert_allclose(phrb.params, coef, rtol=1e-4)
102        assert_allclose(phrb.bse, se, rtol=1e-4)
103
104        # Stratification and entry times
105        phrb = PHReg(time, exog, status, entry=entry,
106                     strata=strata, ties=ties).fit(**args)
107        coef, se, time_r, hazard_r = get_results(n, p, "et_st", ties1)
108        assert_allclose(phrb.params, coef, rtol=1e-3)
109        assert_allclose(phrb.bse, se, rtol=1e-4)
110
111        #smoke test
112        time_h, cumhaz, surv = phrb.baseline_cumulative_hazard[0]
113
114    def test_missing(self):
115
116        np.random.seed(34234)
117        time = 50 * np.random.uniform(size=200)
118        status = np.random.randint(0, 2, 200).astype(np.float64)
119        exog = np.random.normal(size=(200,4))
120
121        time[0:5] = np.nan
122        status[5:10] = np.nan
123        exog[10:15,:] = np.nan
124
125        md = PHReg(time, exog, status, missing='drop')
126        assert_allclose(len(md.endog), 185)
127        assert_allclose(len(md.status), 185)
128        assert_allclose(md.exog.shape, np.r_[185,4])
129
130    def test_formula(self):
131
132        np.random.seed(34234)
133        time = 50 * np.random.uniform(size=200)
134        status = np.random.randint(0, 2, 200).astype(np.float64)
135        exog = np.random.normal(size=(200,4))
136        entry = np.zeros_like(time)
137        entry[0:10] = time[0:10] / 2
138
139        df = pd.DataFrame({"time": time, "status": status,
140                           "exog1": exog[:, 0], "exog2": exog[:, 1],
141                           "exog3": exog[:, 2], "exog4": exog[:, 3],
142                           "entry": entry})
143
144        mod1 = PHReg(time, exog, status, entry=entry)
145        rslt1 = mod1.fit()
146
147        # works with "0 +" on RHS but issues warning
148        fml = "time ~ exog1 + exog2 + exog3 + exog4"
149        mod2 = PHReg.from_formula(fml, df, status=status,
150                                  entry=entry)
151        rslt2 = mod2.fit()
152
153        mod3 = PHReg.from_formula(fml, df, status="status",
154                                  entry="entry")
155        rslt3 = mod3.fit()
156
157        assert_allclose(rslt1.params, rslt2.params)
158        assert_allclose(rslt1.params, rslt3.params)
159        assert_allclose(rslt1.bse, rslt2.bse)
160        assert_allclose(rslt1.bse, rslt3.bse)
161
162    def test_formula_cat_interactions(self):
163
164        time = np.r_[1, 2, 3, 4, 5, 6, 7, 8, 9]
165        status = np.r_[1, 1, 0, 0, 1, 0, 1, 1, 1]
166        x1 = np.r_[1, 1, 1, 2, 2, 2, 3, 3, 3]
167        x2 = np.r_[1, 2, 3, 1, 2, 3, 1, 2, 3]
168        df = pd.DataFrame({"time": time, "status": status,
169                           "x1": x1, "x2": x2})
170
171        model1 = PHReg.from_formula("time ~ C(x1) + C(x2) + C(x1)*C(x2)", status="status",
172                                    data=df)
173        assert_equal(model1.exog.shape, [9, 8])
174
175    def test_predict_formula(self):
176
177        n = 100
178        np.random.seed(34234)
179        time = 50 * np.random.uniform(size=n)
180        status = np.random.randint(0, 2, n).astype(np.float64)
181        exog = np.random.uniform(1, 2, size=(n, 2))
182
183        df = pd.DataFrame({"time": time, "status": status,
184                           "exog1": exog[:, 0], "exog2": exog[:, 1]})
185
186        # Works with "0 +" on RHS but issues warning
187        fml = "time ~ exog1 + np.log(exog2) + exog1*exog2"
188        model1 = PHReg.from_formula(fml, df, status=status)
189        result1 = model1.fit()
190
191        from patsy import dmatrix
192        dfp = dmatrix(model1.data.design_info, df)
193
194        pr1 = result1.predict()
195        pr2 = result1.predict(exog=df)
196        pr3 = model1.predict(result1.params, exog=dfp)  # No standard errors
197        pr4 = model1.predict(result1.params,
198                             cov_params=result1.cov_params(),
199                             exog=dfp)
200
201        prl = (pr1, pr2, pr3, pr4)
202        for i in range(4):
203            for j in range(i):
204                assert_allclose(prl[i].predicted_values,
205                                prl[j].predicted_values)
206
207        prl = (pr1, pr2, pr4)
208        for i in range(3):
209            for j in range(i):
210                assert_allclose(prl[i].standard_errors, prl[j].standard_errors)
211
212    def test_formula_args(self):
213
214        np.random.seed(34234)
215        n = 200
216        time = 50 * np.random.uniform(size=n)
217        status = np.random.randint(0, 2, size=n).astype(np.float64)
218        exog = np.random.normal(size=(200, 2))
219        offset = np.random.uniform(size=n)
220        entry = np.random.uniform(0, 1, size=n) * time
221
222        df = pd.DataFrame({"time": time, "status": status, "x1": exog[:, 0],
223                           "x2": exog[:, 1], "offset": offset, "entry": entry})
224        model1 = PHReg.from_formula("time ~ x1 + x2", status="status", offset="offset",
225                                    entry="entry", data=df)
226        result1 = model1.fit()
227        model2 = PHReg.from_formula("time ~ x1 + x2", status=df.status, offset=df.offset,
228                                    entry=df.entry, data=df)
229        result2 = model2.fit()
230        assert_allclose(result1.params, result2.params)
231        assert_allclose(result1.bse, result2.bse)
232
233    def test_offset(self):
234
235        np.random.seed(34234)
236        time = 50 * np.random.uniform(size=200)
237        status = np.random.randint(0, 2, 200).astype(np.float64)
238        exog = np.random.normal(size=(200,4))
239
240        for ties in "breslow", "efron":
241            mod1 = PHReg(time, exog, status)
242            rslt1 = mod1.fit()
243            offset = exog[:,0] * rslt1.params[0]
244            exog = exog[:, 1:]
245
246            mod2 = PHReg(time, exog, status, offset=offset, ties=ties)
247            rslt2 = mod2.fit()
248
249            assert_allclose(rslt2.params, rslt1.params[1:])
250
251    def test_post_estimation(self):
252        # All regression tests
253        np.random.seed(34234)
254        time = 50 * np.random.uniform(size=200)
255        status = np.random.randint(0, 2, 200).astype(np.float64)
256        exog = np.random.normal(size=(200,4))
257
258        mod = PHReg(time, exog, status)
259        rslt = mod.fit()
260        mart_resid = rslt.martingale_residuals
261        assert_allclose(np.abs(mart_resid).sum(), 120.72475743348433)
262
263        w_avg = rslt.weighted_covariate_averages
264        assert_allclose(np.abs(w_avg[0]).sum(0),
265               np.r_[7.31008415, 9.77608674,10.89515885, 13.1106801])
266
267        bc_haz = rslt.baseline_cumulative_hazard
268        v = [np.mean(np.abs(x)) for x in bc_haz[0]]
269        w = np.r_[23.482841556421608, 0.44149255358417017,
270                  0.68660114081275281]
271        assert_allclose(v, w)
272
273        score_resid = rslt.score_residuals
274        v = np.r_[ 0.50924792, 0.4533952, 0.4876718, 0.5441128]
275        w = np.abs(score_resid).mean(0)
276        assert_allclose(v, w)
277
278        groups = np.random.randint(0, 3, 200)
279        mod = PHReg(time, exog, status)
280        rslt = mod.fit(groups=groups)
281        robust_cov = rslt.cov_params()
282        v = [0.00513432, 0.01278423, 0.00810427, 0.00293147]
283        w = np.abs(robust_cov).mean(0)
284        assert_allclose(v, w, rtol=1e-6)
285
286        s_resid = rslt.schoenfeld_residuals
287        ii = np.flatnonzero(np.isfinite(s_resid).all(1))
288        s_resid = s_resid[ii, :]
289        v = np.r_[0.85154336, 0.72993748, 0.73758071, 0.78599333]
290        assert_allclose(np.abs(s_resid).mean(0), v)
291
292    @pytest.mark.smoke
293    def test_summary(self):
294        np.random.seed(34234)
295        time = 50 * np.random.uniform(size=200)
296        status = np.random.randint(0, 2, 200).astype(np.float64)
297        exog = np.random.normal(size=(200,4))
298
299        mod = PHReg(time, exog, status)
300        rslt = mod.fit()
301        smry = rslt.summary()
302
303        strata = np.kron(np.arange(50), np.ones(4))
304        mod = PHReg(time, exog, status, strata=strata)
305        rslt = mod.fit()
306        smry = rslt.summary()
307        msg = "3 strata dropped for having no events"
308        assert_(msg in str(smry))
309
310        groups = np.kron(np.arange(25), np.ones(8))
311        mod = PHReg(time, exog, status)
312        rslt = mod.fit(groups=groups)
313        smry = rslt.summary()
314
315        entry = np.random.uniform(0.1, 0.8, 200) * time
316        mod = PHReg(time, exog, status, entry=entry)
317        rslt = mod.fit()
318        smry = rslt.summary()
319        msg = "200 observations have positive entry times"
320        assert_(msg in str(smry))
321
322    @pytest.mark.smoke
323    def test_predict(self):
324        # All smoke tests. We should be able to convert the lhr and hr
325        # tests into real tests against R.  There are many options to
326        # this function that may interact in complicated ways.  Only a
327        # few key combinations are tested here.
328        np.random.seed(34234)
329        endog = 50 * np.random.uniform(size=200)
330        status = np.random.randint(0, 2, 200).astype(np.float64)
331        exog = np.random.normal(size=(200,4))
332
333        mod = PHReg(endog, exog, status)
334        rslt = mod.fit()
335        rslt.predict()
336        for pred_type in 'lhr', 'hr', 'cumhaz', 'surv':
337            rslt.predict(pred_type=pred_type)
338            rslt.predict(endog=endog[0:10], pred_type=pred_type)
339            rslt.predict(endog=endog[0:10], exog=exog[0:10,:],
340                         pred_type=pred_type)
341
342    @pytest.mark.smoke
343    def test_get_distribution(self):
344        np.random.seed(34234)
345        n = 200
346        exog = np.random.normal(size=(n, 2))
347        lin_pred = exog.sum(1)
348        elin_pred = np.exp(-lin_pred)
349        time = -elin_pred * np.log(np.random.uniform(size=n))
350        status = np.ones(n)
351        status[0:20] = 0
352        strata = np.kron(range(5), np.ones(n // 5))
353
354        mod = PHReg(time, exog, status=status, strata=strata)
355        rslt = mod.fit()
356
357        dist = rslt.get_distribution()
358
359        fitted_means = dist.mean()
360        true_means = elin_pred
361        fitted_var = dist.var()
362        fitted_sd = dist.std()
363        sample = dist.rvs()
364
365    def test_fit_regularized(self):
366
367        # Data set sizes
368        for n,p in (50,2),(100,5):
369
370            # Penalty weights
371            for js,s in enumerate([0,0.1]):
372
373                coef_name = "coef_%d_%d_%d" % (n, p, js)
374                params = getattr(survival_enet_r_results, coef_name)
375
376                fname = "survival_data_%d_%d.csv" % (n, p)
377                time, status, entry, exog = self.load_file(fname)
378
379                exog -= exog.mean(0)
380                exog /= exog.std(0, ddof=1)
381
382                model = PHReg(time, exog, status=status, ties='breslow')
383                sm_result = model.fit_regularized(alpha=s)
384
385                # The agreement is not very high, the issue may be on
386                # the R side.  See below for further checks.
387                assert_allclose(sm_result.params, params, rtol=0.3)
388
389                # The penalized log-likelihood that we are maximizing.
390                def plf(params):
391                    llf = model.loglike(params) / len(time)
392                    L1_wt = 1
393                    llf = llf - s * ((1 - L1_wt)*np.sum(params**2) / 2 + L1_wt*np.sum(np.abs(params)))
394                    return llf
395
396                # Confirm that we are doing better than glmnet.
397                llf_r = plf(params)
398                llf_sm = plf(sm_result.params)
399                assert_equal(np.sign(llf_sm - llf_r), 1)
400
401
402cur_dir = os.path.dirname(os.path.abspath(__file__))
403rdir = os.path.join(cur_dir, 'results')
404fnames = os.listdir(rdir)
405fnames = [x for x in fnames if x.startswith("survival")
406          and x.endswith(".csv")]
407
408ties = ("breslow", "efron")
409entry_f = (False, True)
410strata_f = (False, True)
411
412
413@pytest.mark.parametrize('fname,ties,entry_f,strata_f',
414                         list(itertools.product(fnames, ties, entry_f, strata_f)))
415def test_r(fname, ties, entry_f, strata_f):
416    TestPHReg.do1(fname, ties, entry_f, strata_f)
417