1# Copyright 2020 The PyMC Developers 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import numpy as np 16import pytest 17import theano 18 19from scipy.integrate import odeint 20from scipy.stats import norm 21 22import pymc3 as pm 23 24from pymc3.ode import DifferentialEquation 25from pymc3.ode.utils import augment_system 26 27 28def test_gradients(): 29 """Tests the computation of the sensitivities from the theano computation graph""" 30 31 # ODE system for which to compute gradients 32 def ode_func(y, t, p): 33 return np.exp(-t) - p[0] * y[0] 34 35 # Computation of graidients with Theano 36 augmented_ode_func = augment_system(ode_func, 1, 1 + 1) 37 38 # This is the new system, ODE + Sensitivities, which will be integrated 39 def augmented_system(Y, t, p): 40 dydt, ddt_dydp = augmented_ode_func(Y[:1], t, p, Y[1:]) 41 derivatives = np.concatenate([dydt, ddt_dydp]) 42 return derivatives 43 44 # Create real sensitivities 45 y0 = 0.0 46 t = np.arange(0, 12, 0.25).reshape(-1, 1) 47 a = 0.472 48 p = np.array([y0, a]) 49 50 # Derivatives of the analytic solution with respect to y0 and alpha 51 # Treat y0 like a parameter and solve analytically. Then differentiate. 52 # I used CAS to get these derivatives 53 y0_sensitivity = np.exp(-a * t) 54 a_sensitivity = ( 55 -(np.exp(t * (a - 1)) - 1 + (a - 1) * (y0 * a - y0 - 1) * t) * np.exp(-a * t) / (a - 1) ** 2 56 ) 57 58 sensitivity = np.c_[y0_sensitivity, a_sensitivity] 59 60 integrated_solutions = odeint(func=augmented_system, y0=[y0, 1, 0], t=t.ravel(), args=(p,)) 61 simulated_sensitivity = integrated_solutions[:, 1:] 62 63 np.testing.assert_allclose(sensitivity, simulated_sensitivity, rtol=1e-5) 64 65 66def test_simulate(): 67 """Tests the integration in DifferentialEquation""" 68 69 # Create an ODe to integrate 70 def ode_func(y, t, p): 71 return np.exp(-t) - p[0] * y[0] 72 73 # Evaluate exact solution 74 y0 = 0 75 t = np.arange(0, 12, 0.25).reshape(-1, 1) 76 a = 0.472 77 y = 1.0 / (a - 1) * (np.exp(-t) - np.exp(-a * t)) 78 79 # Instantiate ODE model 80 ode_model = DifferentialEquation(func=ode_func, t0=0, times=t, n_states=1, n_theta=1) 81 82 simulated_y, sens = ode_model._simulate([y0], [a]) 83 84 assert simulated_y.shape == (len(t), 1) 85 assert sens.shape == (len(t), 1, 1 + 1) 86 np.testing.assert_allclose(y, simulated_y, rtol=1e-5) 87 88 89class TestSensitivityInitialCondition: 90 91 t = np.arange(0, 12, 0.25).reshape(-1, 1) 92 93 def test_sens_ic_scalar_1_param(self): 94 """Tests the creation of the initial condition for the sensitivities""" 95 # Scalar ODE 1 Param 96 # Create an ODe to integrate 97 def ode_func_1(y, t, p): 98 return np.exp(-t) - p[0] * y[0] 99 100 # Instantiate ODE model 101 # Instantiate ODE model 102 model1 = DifferentialEquation(func=ode_func_1, t0=0, times=self.t, n_states=1, n_theta=1) 103 104 # Sensitivity initial condition for this model should be 1 by 2 105 model1_sens_ic = np.array([1, 0]) 106 107 np.testing.assert_array_equal(model1_sens_ic, model1._sens_ic) 108 109 def test_sens_ic_scalar_2_param(self): 110 # Scalar ODE 2 Param 111 def ode_func_2(y, t, p): 112 return p[0] * np.exp(-p[0] * t) - p[1] * y[0] 113 114 # Instantiate ODE model 115 model2 = DifferentialEquation(func=ode_func_2, t0=0, times=self.t, n_states=1, n_theta=2) 116 117 model2_sens_ic = np.array([1, 0, 0]) 118 119 np.testing.assert_array_equal(model2_sens_ic, model2._sens_ic) 120 121 def test_sens_ic_vector_1_param(self): 122 # Vector ODE 1 Param 123 def ode_func_3(y, t, p): 124 ds = -p[0] * y[0] * y[1] 125 di = p[0] * y[0] * y[1] - y[1] 126 127 return [ds, di] 128 129 # Instantiate ODE model 130 model3 = DifferentialEquation(func=ode_func_3, t0=0, times=self.t, n_states=2, n_theta=1) 131 132 model3_sens_ic = np.array([1, 0, 0, 0, 1, 0]) 133 134 np.testing.assert_array_equal(model3_sens_ic, model3._sens_ic) 135 136 def test_sens_ic_vector_2_param(self): 137 # Vector ODE 2 Param 138 def ode_func_4(y, t, p): 139 ds = -p[0] * y[0] * y[1] 140 di = p[0] * y[0] * y[1] - p[1] * y[1] 141 142 return [ds, di] 143 144 # Instantiate ODE model 145 model4 = DifferentialEquation(func=ode_func_4, t0=0, times=self.t, n_states=2, n_theta=2) 146 147 model4_sens_ic = np.array([1, 0, 0, 0, 0, 1, 0, 0]) 148 149 np.testing.assert_array_equal(model4_sens_ic, model4._sens_ic) 150 151 def test_sens_ic_vector_3_params(self): 152 # Big System with Many Parameters 153 def ode_func_5(y, t, p): 154 dx = p[0] * (y[1] - y[0]) 155 ds = y[0] * (p[1] - y[2]) - y[1] 156 dz = y[0] * y[1] - p[2] * y[2] 157 158 return [dx, ds, dz] 159 160 # Instantiate ODE model 161 model5 = DifferentialEquation(func=ode_func_5, t0=0, times=self.t, n_states=3, n_theta=3) 162 163 # First three columns are derivatives with respect to ode parameters 164 # Last three coluimns are derivatives with repsect to initial condition 165 # So identity matrix should appear in last 3 columns 166 model5_sens_ic = np.array([[1, 0, 0, 0, 0, 0], [0, 1, 0, 0, 0, 0], [0, 0, 1, 0, 0, 0]]) 167 168 np.testing.assert_array_equal(np.ravel(model5_sens_ic), model5._sens_ic) 169 170 171def test_logp_scalar_ode(): 172 """Test the computation of the log probability for these models""" 173 174 # Differential equation 175 def system_1(y, t, p): 176 return np.exp(-t) - p[0] * y[0] 177 178 # Parameters and inital condition 179 alpha = 0.4 180 y0 = 0.0 181 times = np.arange(0.5, 8, 0.5) 182 183 yobs = np.array( 184 [0.30, 0.56, 0.51, 0.55, 0.47, 0.42, 0.38, 0.30, 0.26, 0.21, 0.22, 0.13, 0.13, 0.09, 0.09] 185 )[:, np.newaxis] 186 187 ode_model = DifferentialEquation(func=system_1, t0=0, times=times, n_theta=1, n_states=1) 188 189 integrated_solution, *_ = ode_model._simulate([y0], [alpha]) 190 191 assert integrated_solution.shape == yobs.shape 192 193 # compare automatic and manual logp values 194 manual_logp = norm.logpdf(x=np.ravel(yobs), loc=np.ravel(integrated_solution), scale=1).sum() 195 with pm.Model() as model_1: 196 forward = ode_model(theta=[alpha], y0=[y0]) 197 y = pm.Normal("y", mu=forward, sd=1, observed=yobs) 198 pymc3_logp = model_1.logp() 199 200 np.testing.assert_allclose(manual_logp, pymc3_logp) 201 202 203class TestErrors: 204 """Test running model for a scalar ODE with 1 parameter""" 205 206 def system(y, t, p): 207 return np.exp(-t) - p[0] * y[0] 208 209 times = np.arange(0, 9) 210 211 ode_model = DifferentialEquation(func=system, t0=0, times=times, n_states=1, n_theta=1) 212 213 @pytest.mark.xfail(condition=(theano.config.floatX == "float32"), reason="Fails on float32") 214 def test_too_many_params(self): 215 with pytest.raises(pm.ShapeError): 216 self.ode_model(theta=[1, 1], y0=[0]) 217 218 @pytest.mark.xfail(condition=(theano.config.floatX == "float32"), reason="Fails on float32") 219 def test_too_many_y0(self): 220 with pytest.raises(pm.ShapeError): 221 self.ode_model(theta=[1], y0=[0, 0]) 222 223 @pytest.mark.xfail(condition=(theano.config.floatX == "float32"), reason="Fails on float32") 224 def test_too_few_params(self): 225 with pytest.raises(pm.ShapeError): 226 self.ode_model(theta=[], y0=[1]) 227 228 @pytest.mark.xfail(condition=(theano.config.floatX == "float32"), reason="Fails on float32") 229 def test_too_few_y0(self): 230 with pytest.raises(pm.ShapeError): 231 self.ode_model(theta=[1], y0=[]) 232 233 def test_func_callable(self): 234 with pytest.raises(ValueError): 235 DifferentialEquation(func=1, t0=0, times=self.times, n_states=1, n_theta=1) 236 237 def test_number_of_states(self): 238 with pytest.raises(ValueError): 239 DifferentialEquation(func=self.system, t0=0, times=self.times, n_states=0, n_theta=1) 240 241 def test_number_of_params(self): 242 with pytest.raises(ValueError): 243 DifferentialEquation(func=self.system, t0=0, times=self.times, n_states=1, n_theta=0) 244 245 246class TestDiffEqModel: 247 def test_op_equality(self): 248 """Tests that the equality of mathematically identical Ops evaluates True""" 249 250 # Create ODE to test with 251 def ode_func(y, t, p): 252 return np.exp(-t) - p[0] * y[0] 253 254 t = np.linspace(0, 2, 12) 255 256 # Instantiate two Ops 257 op_1 = DifferentialEquation(func=ode_func, t0=0, times=t, n_states=1, n_theta=1) 258 op_2 = DifferentialEquation(func=ode_func, t0=0, times=t, n_states=1, n_theta=1) 259 op_other = DifferentialEquation( 260 func=ode_func, t0=0, times=np.linspace(0, 2, 16), n_states=1, n_theta=1 261 ) 262 263 assert op_1 == op_2 264 assert op_1 != op_other 265 return 266 267 def test_scalar_ode_1_param(self): 268 """Test running model for a scalar ODE with 1 parameter""" 269 270 def system(y, t, p): 271 return np.exp(-t) - p[0] * y[0] 272 273 times = np.array( 274 [0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0, 5.5, 6.0, 6.5, 7.0, 7.5] 275 ) 276 277 yobs = np.array( 278 [0.31, 0.57, 0.51, 0.55, 0.47, 0.42, 0.38, 0.3, 0.26, 0.22, 0.22, 0.14, 0.14, 0.09, 0.1] 279 )[:, np.newaxis] 280 281 ode_model = DifferentialEquation(func=system, t0=0, times=times, n_states=1, n_theta=1) 282 283 with pm.Model() as model: 284 alpha = pm.HalfCauchy("alpha", 1) 285 y0 = pm.Lognormal("y0", 0, 1) 286 sigma = pm.HalfCauchy("sigma", 1) 287 forward = ode_model(theta=[alpha], y0=[y0]) 288 y = pm.Lognormal("y", mu=pm.math.log(forward), sd=sigma, observed=yobs) 289 trace = pm.sample(100, tune=0, chains=1, return_inferencedata=False) 290 291 assert trace["alpha"].size > 0 292 assert trace["y0"].size > 0 293 assert trace["sigma"].size > 0 294 295 def test_scalar_ode_2_param(self): 296 """Test running model for a scalar ODE with 2 parameters""" 297 298 def system(y, t, p): 299 return p[0] * np.exp(-p[0] * t) - p[1] * y[0] 300 301 times = np.array( 302 [0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0, 5.5, 6.0, 6.5, 7.0, 7.5] 303 ) 304 305 yobs = np.array( 306 [0.31, 0.57, 0.51, 0.55, 0.47, 0.42, 0.38, 0.3, 0.26, 0.22, 0.22, 0.14, 0.14, 0.09, 0.1] 307 )[:, np.newaxis] 308 309 ode_model = DifferentialEquation(func=system, t0=0, times=times, n_states=1, n_theta=2) 310 311 with pm.Model() as model: 312 alpha = pm.HalfCauchy("alpha", 1) 313 beta = pm.HalfCauchy("beta", 1) 314 y0 = pm.Lognormal("y0", 0, 1) 315 sigma = pm.HalfCauchy("sigma", 1) 316 forward = ode_model(theta=[alpha, beta], y0=[y0]) 317 y = pm.Lognormal("y", mu=pm.math.log(forward), sd=sigma, observed=yobs) 318 319 trace = pm.sample(100, tune=0, chains=1, return_inferencedata=False) 320 321 assert trace["alpha"].size > 0 322 assert trace["beta"].size > 0 323 assert trace["y0"].size > 0 324 assert trace["sigma"].size > 0 325 326 def test_vector_ode_1_param(self): 327 """Test running model for a vector ODE with 1 parameter""" 328 329 def system(y, t, p): 330 ds = -p[0] * y[0] * y[1] 331 di = p[0] * y[0] * y[1] - y[1] 332 return [ds, di] 333 334 times = np.array([0.0, 0.8, 1.6, 2.4, 3.2, 4.0, 4.8, 5.6, 6.4, 7.2, 8.0]) 335 336 yobs = np.array( 337 [ 338 [1.02, 0.02], 339 [0.86, 0.12], 340 [0.43, 0.37], 341 [0.14, 0.42], 342 [0.05, 0.43], 343 [0.03, 0.14], 344 [0.02, 0.08], 345 [0.02, 0.04], 346 [0.02, 0.01], 347 [0.02, 0.01], 348 [0.02, 0.01], 349 ] 350 ) 351 352 ode_model = DifferentialEquation(func=system, t0=0, times=times, n_states=2, n_theta=1) 353 354 with pm.Model() as model: 355 R = pm.Lognormal("R", 1, 5) 356 sigma = pm.HalfCauchy("sigma", 1, shape=2) 357 forward = ode_model(theta=[R], y0=[0.99, 0.01]) 358 y = pm.Lognormal("y", mu=pm.math.log(forward), sd=sigma, observed=yobs) 359 360 trace = pm.sample(100, tune=0, chains=1, return_inferencedata=False) 361 362 assert trace["R"].size > 0 363 assert trace["sigma"].size > 0 364 365 def test_vector_ode_2_param(self): 366 """Test running model for a vector ODE with 2 parameters""" 367 368 def system(y, t, p): 369 ds = -p[0] * y[0] * y[1] 370 di = p[0] * y[0] * y[1] - p[1] * y[1] 371 return [ds, di] 372 373 times = np.array([0.0, 0.8, 1.6, 2.4, 3.2, 4.0, 4.8, 5.6, 6.4, 7.2, 8.0]) 374 375 yobs = np.array( 376 [ 377 [1.02, 0.02], 378 [0.86, 0.12], 379 [0.43, 0.37], 380 [0.14, 0.42], 381 [0.05, 0.43], 382 [0.03, 0.14], 383 [0.02, 0.08], 384 [0.02, 0.04], 385 [0.02, 0.01], 386 [0.02, 0.01], 387 [0.02, 0.01], 388 ] 389 ) 390 391 ode_model = DifferentialEquation(func=system, t0=0, times=times, n_states=2, n_theta=2) 392 393 with pm.Model() as model: 394 beta = pm.HalfCauchy("beta", 1) 395 gamma = pm.HalfCauchy("gamma", 1) 396 sigma = pm.HalfCauchy("sigma", 1, shape=2) 397 forward = ode_model(theta=[beta, gamma], y0=[0.99, 0.01]) 398 y = pm.Lognormal("y", mu=pm.math.log(forward), sd=sigma, observed=yobs) 399 400 trace = pm.sample(100, tune=0, chains=1, return_inferencedata=False) 401 402 assert trace["beta"].size > 0 403 assert trace["gamma"].size > 0 404 assert trace["sigma"].size > 0 405