1import itertools 2import os 3 4import numpy as np 5from statsmodels.duration.hazard_regression import PHReg 6from numpy.testing import (assert_allclose, 7 assert_equal, assert_) 8import pandas as pd 9import pytest 10 11# TODO: Include some corner cases: data sets with empty strata, strata 12# with no events, entry times after censoring times, etc. 13 14# All the R results 15from .results import survival_r_results 16from .results import survival_enet_r_results 17 18""" 19Tests of PHReg against R coxph. 20 21Tests include entry times and stratification. 22 23phreg_gentests.py generates the test data sets and puts them into the 24results folder. 25 26survival.R runs R on all the test data sets and constructs the 27survival_r_results module. 28""" 29 30# Arguments passed to the PHReg fit method. 31args = {"method": "bfgs", "disp": 0} 32 33 34def get_results(n, p, ext, ties): 35 if ext is None: 36 coef_name = "coef_%d_%d_%s" % (n, p, ties) 37 se_name = "se_%d_%d_%s" % (n, p, ties) 38 time_name = "time_%d_%d_%s" % (n, p, ties) 39 hazard_name = "hazard_%d_%d_%s" % (n, p, ties) 40 else: 41 coef_name = "coef_%d_%d_%s_%s" % (n, p, ext, ties) 42 se_name = "se_%d_%d_%s_%s" % (n, p, ext, ties) 43 time_name = "time_%d_%d_%s_%s" % (n, p, ext, ties) 44 hazard_name = "hazard_%d_%d_%s_%s" % (n, p, ext, ties) 45 coef = getattr(survival_r_results, coef_name) 46 se = getattr(survival_r_results, se_name) 47 time = getattr(survival_r_results, time_name) 48 hazard = getattr(survival_r_results, hazard_name) 49 return coef, se, time, hazard 50 51class TestPHReg(object): 52 53 # Load a data file from the results directory 54 @staticmethod 55 def load_file(fname): 56 cur_dir = os.path.dirname(os.path.abspath(__file__)) 57 data = np.genfromtxt(os.path.join(cur_dir, 'results', fname), 58 delimiter=" ") 59 time = data[:,0] 60 status = data[:,1] 61 entry = data[:,2] 62 exog = data[:,3:] 63 64 return time, status, entry, exog 65 66 # Run a single test against R output 67 @staticmethod 68 def do1(fname, ties, entry_f, strata_f): 69 70 # Read the test data. 71 time, status, entry, exog = TestPHReg.load_file(fname) 72 n = len(time) 73 74 vs = fname.split("_") 75 n = int(vs[2]) 76 p = int(vs[3].split(".")[0]) 77 ties1 = ties[0:3] 78 79 # Needs to match the kronecker statement in survival.R 80 strata = np.kron(range(5), np.ones(n // 5)) 81 82 # No stratification or entry times 83 mod = PHReg(time, exog, status, ties=ties) 84 phrb = mod.fit(**args) 85 coef_r, se_r, time_r, hazard_r = get_results(n, p, None, ties1) 86 assert_allclose(phrb.params, coef_r, rtol=1e-3) 87 assert_allclose(phrb.bse, se_r, rtol=1e-4) 88 time_h, cumhaz, surv = phrb.baseline_cumulative_hazard[0] 89 90 # Entry times but no stratification 91 phrb = PHReg(time, exog, status, entry=entry, 92 ties=ties).fit(**args) 93 coef, se, time_r, hazard_r = get_results(n, p, "et", ties1) 94 assert_allclose(phrb.params, coef, rtol=1e-3) 95 assert_allclose(phrb.bse, se, rtol=1e-3) 96 97 # Stratification but no entry times 98 phrb = PHReg(time, exog, status, strata=strata, 99 ties=ties).fit(**args) 100 coef, se, time_r, hazard_r = get_results(n, p, "st", ties1) 101 assert_allclose(phrb.params, coef, rtol=1e-4) 102 assert_allclose(phrb.bse, se, rtol=1e-4) 103 104 # Stratification and entry times 105 phrb = PHReg(time, exog, status, entry=entry, 106 strata=strata, ties=ties).fit(**args) 107 coef, se, time_r, hazard_r = get_results(n, p, "et_st", ties1) 108 assert_allclose(phrb.params, coef, rtol=1e-3) 109 assert_allclose(phrb.bse, se, rtol=1e-4) 110 111 #smoke test 112 time_h, cumhaz, surv = phrb.baseline_cumulative_hazard[0] 113 114 def test_missing(self): 115 116 np.random.seed(34234) 117 time = 50 * np.random.uniform(size=200) 118 status = np.random.randint(0, 2, 200).astype(np.float64) 119 exog = np.random.normal(size=(200,4)) 120 121 time[0:5] = np.nan 122 status[5:10] = np.nan 123 exog[10:15,:] = np.nan 124 125 md = PHReg(time, exog, status, missing='drop') 126 assert_allclose(len(md.endog), 185) 127 assert_allclose(len(md.status), 185) 128 assert_allclose(md.exog.shape, np.r_[185,4]) 129 130 def test_formula(self): 131 132 np.random.seed(34234) 133 time = 50 * np.random.uniform(size=200) 134 status = np.random.randint(0, 2, 200).astype(np.float64) 135 exog = np.random.normal(size=(200,4)) 136 entry = np.zeros_like(time) 137 entry[0:10] = time[0:10] / 2 138 139 df = pd.DataFrame({"time": time, "status": status, 140 "exog1": exog[:, 0], "exog2": exog[:, 1], 141 "exog3": exog[:, 2], "exog4": exog[:, 3], 142 "entry": entry}) 143 144 mod1 = PHReg(time, exog, status, entry=entry) 145 rslt1 = mod1.fit() 146 147 # works with "0 +" on RHS but issues warning 148 fml = "time ~ exog1 + exog2 + exog3 + exog4" 149 mod2 = PHReg.from_formula(fml, df, status=status, 150 entry=entry) 151 rslt2 = mod2.fit() 152 153 mod3 = PHReg.from_formula(fml, df, status="status", 154 entry="entry") 155 rslt3 = mod3.fit() 156 157 assert_allclose(rslt1.params, rslt2.params) 158 assert_allclose(rslt1.params, rslt3.params) 159 assert_allclose(rslt1.bse, rslt2.bse) 160 assert_allclose(rslt1.bse, rslt3.bse) 161 162 def test_formula_cat_interactions(self): 163 164 time = np.r_[1, 2, 3, 4, 5, 6, 7, 8, 9] 165 status = np.r_[1, 1, 0, 0, 1, 0, 1, 1, 1] 166 x1 = np.r_[1, 1, 1, 2, 2, 2, 3, 3, 3] 167 x2 = np.r_[1, 2, 3, 1, 2, 3, 1, 2, 3] 168 df = pd.DataFrame({"time": time, "status": status, 169 "x1": x1, "x2": x2}) 170 171 model1 = PHReg.from_formula("time ~ C(x1) + C(x2) + C(x1)*C(x2)", status="status", 172 data=df) 173 assert_equal(model1.exog.shape, [9, 8]) 174 175 def test_predict_formula(self): 176 177 n = 100 178 np.random.seed(34234) 179 time = 50 * np.random.uniform(size=n) 180 status = np.random.randint(0, 2, n).astype(np.float64) 181 exog = np.random.uniform(1, 2, size=(n, 2)) 182 183 df = pd.DataFrame({"time": time, "status": status, 184 "exog1": exog[:, 0], "exog2": exog[:, 1]}) 185 186 # Works with "0 +" on RHS but issues warning 187 fml = "time ~ exog1 + np.log(exog2) + exog1*exog2" 188 model1 = PHReg.from_formula(fml, df, status=status) 189 result1 = model1.fit() 190 191 from patsy import dmatrix 192 dfp = dmatrix(model1.data.design_info, df) 193 194 pr1 = result1.predict() 195 pr2 = result1.predict(exog=df) 196 pr3 = model1.predict(result1.params, exog=dfp) # No standard errors 197 pr4 = model1.predict(result1.params, 198 cov_params=result1.cov_params(), 199 exog=dfp) 200 201 prl = (pr1, pr2, pr3, pr4) 202 for i in range(4): 203 for j in range(i): 204 assert_allclose(prl[i].predicted_values, 205 prl[j].predicted_values) 206 207 prl = (pr1, pr2, pr4) 208 for i in range(3): 209 for j in range(i): 210 assert_allclose(prl[i].standard_errors, prl[j].standard_errors) 211 212 def test_formula_args(self): 213 214 np.random.seed(34234) 215 n = 200 216 time = 50 * np.random.uniform(size=n) 217 status = np.random.randint(0, 2, size=n).astype(np.float64) 218 exog = np.random.normal(size=(200, 2)) 219 offset = np.random.uniform(size=n) 220 entry = np.random.uniform(0, 1, size=n) * time 221 222 df = pd.DataFrame({"time": time, "status": status, "x1": exog[:, 0], 223 "x2": exog[:, 1], "offset": offset, "entry": entry}) 224 model1 = PHReg.from_formula("time ~ x1 + x2", status="status", offset="offset", 225 entry="entry", data=df) 226 result1 = model1.fit() 227 model2 = PHReg.from_formula("time ~ x1 + x2", status=df.status, offset=df.offset, 228 entry=df.entry, data=df) 229 result2 = model2.fit() 230 assert_allclose(result1.params, result2.params) 231 assert_allclose(result1.bse, result2.bse) 232 233 def test_offset(self): 234 235 np.random.seed(34234) 236 time = 50 * np.random.uniform(size=200) 237 status = np.random.randint(0, 2, 200).astype(np.float64) 238 exog = np.random.normal(size=(200,4)) 239 240 for ties in "breslow", "efron": 241 mod1 = PHReg(time, exog, status) 242 rslt1 = mod1.fit() 243 offset = exog[:,0] * rslt1.params[0] 244 exog = exog[:, 1:] 245 246 mod2 = PHReg(time, exog, status, offset=offset, ties=ties) 247 rslt2 = mod2.fit() 248 249 assert_allclose(rslt2.params, rslt1.params[1:]) 250 251 def test_post_estimation(self): 252 # All regression tests 253 np.random.seed(34234) 254 time = 50 * np.random.uniform(size=200) 255 status = np.random.randint(0, 2, 200).astype(np.float64) 256 exog = np.random.normal(size=(200,4)) 257 258 mod = PHReg(time, exog, status) 259 rslt = mod.fit() 260 mart_resid = rslt.martingale_residuals 261 assert_allclose(np.abs(mart_resid).sum(), 120.72475743348433) 262 263 w_avg = rslt.weighted_covariate_averages 264 assert_allclose(np.abs(w_avg[0]).sum(0), 265 np.r_[7.31008415, 9.77608674,10.89515885, 13.1106801]) 266 267 bc_haz = rslt.baseline_cumulative_hazard 268 v = [np.mean(np.abs(x)) for x in bc_haz[0]] 269 w = np.r_[23.482841556421608, 0.44149255358417017, 270 0.68660114081275281] 271 assert_allclose(v, w) 272 273 score_resid = rslt.score_residuals 274 v = np.r_[ 0.50924792, 0.4533952, 0.4876718, 0.5441128] 275 w = np.abs(score_resid).mean(0) 276 assert_allclose(v, w) 277 278 groups = np.random.randint(0, 3, 200) 279 mod = PHReg(time, exog, status) 280 rslt = mod.fit(groups=groups) 281 robust_cov = rslt.cov_params() 282 v = [0.00513432, 0.01278423, 0.00810427, 0.00293147] 283 w = np.abs(robust_cov).mean(0) 284 assert_allclose(v, w, rtol=1e-6) 285 286 s_resid = rslt.schoenfeld_residuals 287 ii = np.flatnonzero(np.isfinite(s_resid).all(1)) 288 s_resid = s_resid[ii, :] 289 v = np.r_[0.85154336, 0.72993748, 0.73758071, 0.78599333] 290 assert_allclose(np.abs(s_resid).mean(0), v) 291 292 @pytest.mark.smoke 293 def test_summary(self): 294 np.random.seed(34234) 295 time = 50 * np.random.uniform(size=200) 296 status = np.random.randint(0, 2, 200).astype(np.float64) 297 exog = np.random.normal(size=(200,4)) 298 299 mod = PHReg(time, exog, status) 300 rslt = mod.fit() 301 smry = rslt.summary() 302 303 strata = np.kron(np.arange(50), np.ones(4)) 304 mod = PHReg(time, exog, status, strata=strata) 305 rslt = mod.fit() 306 smry = rslt.summary() 307 msg = "3 strata dropped for having no events" 308 assert_(msg in str(smry)) 309 310 groups = np.kron(np.arange(25), np.ones(8)) 311 mod = PHReg(time, exog, status) 312 rslt = mod.fit(groups=groups) 313 smry = rslt.summary() 314 315 entry = np.random.uniform(0.1, 0.8, 200) * time 316 mod = PHReg(time, exog, status, entry=entry) 317 rslt = mod.fit() 318 smry = rslt.summary() 319 msg = "200 observations have positive entry times" 320 assert_(msg in str(smry)) 321 322 @pytest.mark.smoke 323 def test_predict(self): 324 # All smoke tests. We should be able to convert the lhr and hr 325 # tests into real tests against R. There are many options to 326 # this function that may interact in complicated ways. Only a 327 # few key combinations are tested here. 328 np.random.seed(34234) 329 endog = 50 * np.random.uniform(size=200) 330 status = np.random.randint(0, 2, 200).astype(np.float64) 331 exog = np.random.normal(size=(200,4)) 332 333 mod = PHReg(endog, exog, status) 334 rslt = mod.fit() 335 rslt.predict() 336 for pred_type in 'lhr', 'hr', 'cumhaz', 'surv': 337 rslt.predict(pred_type=pred_type) 338 rslt.predict(endog=endog[0:10], pred_type=pred_type) 339 rslt.predict(endog=endog[0:10], exog=exog[0:10,:], 340 pred_type=pred_type) 341 342 @pytest.mark.smoke 343 def test_get_distribution(self): 344 np.random.seed(34234) 345 n = 200 346 exog = np.random.normal(size=(n, 2)) 347 lin_pred = exog.sum(1) 348 elin_pred = np.exp(-lin_pred) 349 time = -elin_pred * np.log(np.random.uniform(size=n)) 350 status = np.ones(n) 351 status[0:20] = 0 352 strata = np.kron(range(5), np.ones(n // 5)) 353 354 mod = PHReg(time, exog, status=status, strata=strata) 355 rslt = mod.fit() 356 357 dist = rslt.get_distribution() 358 359 fitted_means = dist.mean() 360 true_means = elin_pred 361 fitted_var = dist.var() 362 fitted_sd = dist.std() 363 sample = dist.rvs() 364 365 def test_fit_regularized(self): 366 367 # Data set sizes 368 for n,p in (50,2),(100,5): 369 370 # Penalty weights 371 for js,s in enumerate([0,0.1]): 372 373 coef_name = "coef_%d_%d_%d" % (n, p, js) 374 params = getattr(survival_enet_r_results, coef_name) 375 376 fname = "survival_data_%d_%d.csv" % (n, p) 377 time, status, entry, exog = self.load_file(fname) 378 379 exog -= exog.mean(0) 380 exog /= exog.std(0, ddof=1) 381 382 model = PHReg(time, exog, status=status, ties='breslow') 383 sm_result = model.fit_regularized(alpha=s) 384 385 # The agreement is not very high, the issue may be on 386 # the R side. See below for further checks. 387 assert_allclose(sm_result.params, params, rtol=0.3) 388 389 # The penalized log-likelihood that we are maximizing. 390 def plf(params): 391 llf = model.loglike(params) / len(time) 392 L1_wt = 1 393 llf = llf - s * ((1 - L1_wt)*np.sum(params**2) / 2 + L1_wt*np.sum(np.abs(params))) 394 return llf 395 396 # Confirm that we are doing better than glmnet. 397 llf_r = plf(params) 398 llf_sm = plf(sm_result.params) 399 assert_equal(np.sign(llf_sm - llf_r), 1) 400 401 402cur_dir = os.path.dirname(os.path.abspath(__file__)) 403rdir = os.path.join(cur_dir, 'results') 404fnames = os.listdir(rdir) 405fnames = [x for x in fnames if x.startswith("survival") 406 and x.endswith(".csv")] 407 408ties = ("breslow", "efron") 409entry_f = (False, True) 410strata_f = (False, True) 411 412 413@pytest.mark.parametrize('fname,ties,entry_f,strata_f', 414 list(itertools.product(fnames, ties, entry_f, strata_f))) 415def test_r(fname, ties, entry_f, strata_f): 416 TestPHReg.do1(fname, ties, entry_f, strata_f) 417