1#   Copyright 2020 The PyMC Developers
2#
3#   Licensed under the Apache License, Version 2.0 (the "License");
4#   you may not use this file except in compliance with the License.
5#   You may obtain a copy of the License at
6#
7#       http://www.apache.org/licenses/LICENSE-2.0
8#
9#   Unless required by applicable law or agreed to in writing, software
10#   distributed under the License is distributed on an "AS IS" BASIS,
11#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12#   See the License for the specific language governing permissions and
13#   limitations under the License.
14
15import numpy as np
16import pytest
17import theano
18
19from scipy.integrate import odeint
20from scipy.stats import norm
21
22import pymc3 as pm
23
24from pymc3.ode import DifferentialEquation
25from pymc3.ode.utils import augment_system
26
27
28def test_gradients():
29    """Tests the computation of the sensitivities from the theano computation graph"""
30
31    # ODE system for which to compute gradients
32    def ode_func(y, t, p):
33        return np.exp(-t) - p[0] * y[0]
34
35    # Computation of graidients with Theano
36    augmented_ode_func = augment_system(ode_func, 1, 1 + 1)
37
38    # This is the new system, ODE + Sensitivities, which will be integrated
39    def augmented_system(Y, t, p):
40        dydt, ddt_dydp = augmented_ode_func(Y[:1], t, p, Y[1:])
41        derivatives = np.concatenate([dydt, ddt_dydp])
42        return derivatives
43
44    # Create real sensitivities
45    y0 = 0.0
46    t = np.arange(0, 12, 0.25).reshape(-1, 1)
47    a = 0.472
48    p = np.array([y0, a])
49
50    # Derivatives of the analytic solution with respect to y0 and alpha
51    # Treat y0 like a parameter and solve analytically.  Then differentiate.
52    # I used CAS to get these derivatives
53    y0_sensitivity = np.exp(-a * t)
54    a_sensitivity = (
55        -(np.exp(t * (a - 1)) - 1 + (a - 1) * (y0 * a - y0 - 1) * t) * np.exp(-a * t) / (a - 1) ** 2
56    )
57
58    sensitivity = np.c_[y0_sensitivity, a_sensitivity]
59
60    integrated_solutions = odeint(func=augmented_system, y0=[y0, 1, 0], t=t.ravel(), args=(p,))
61    simulated_sensitivity = integrated_solutions[:, 1:]
62
63    np.testing.assert_allclose(sensitivity, simulated_sensitivity, rtol=1e-5)
64
65
66def test_simulate():
67    """Tests the integration in DifferentialEquation"""
68
69    # Create an ODe to integrate
70    def ode_func(y, t, p):
71        return np.exp(-t) - p[0] * y[0]
72
73    # Evaluate exact solution
74    y0 = 0
75    t = np.arange(0, 12, 0.25).reshape(-1, 1)
76    a = 0.472
77    y = 1.0 / (a - 1) * (np.exp(-t) - np.exp(-a * t))
78
79    # Instantiate ODE model
80    ode_model = DifferentialEquation(func=ode_func, t0=0, times=t, n_states=1, n_theta=1)
81
82    simulated_y, sens = ode_model._simulate([y0], [a])
83
84    assert simulated_y.shape == (len(t), 1)
85    assert sens.shape == (len(t), 1, 1 + 1)
86    np.testing.assert_allclose(y, simulated_y, rtol=1e-5)
87
88
89class TestSensitivityInitialCondition:
90
91    t = np.arange(0, 12, 0.25).reshape(-1, 1)
92
93    def test_sens_ic_scalar_1_param(self):
94        """Tests the creation of the initial condition for the sensitivities"""
95        # Scalar ODE 1 Param
96        # Create an ODe to integrate
97        def ode_func_1(y, t, p):
98            return np.exp(-t) - p[0] * y[0]
99
100        # Instantiate ODE model
101        # Instantiate ODE model
102        model1 = DifferentialEquation(func=ode_func_1, t0=0, times=self.t, n_states=1, n_theta=1)
103
104        # Sensitivity initial condition for this model should be 1 by 2
105        model1_sens_ic = np.array([1, 0])
106
107        np.testing.assert_array_equal(model1_sens_ic, model1._sens_ic)
108
109    def test_sens_ic_scalar_2_param(self):
110        # Scalar ODE 2 Param
111        def ode_func_2(y, t, p):
112            return p[0] * np.exp(-p[0] * t) - p[1] * y[0]
113
114        # Instantiate ODE model
115        model2 = DifferentialEquation(func=ode_func_2, t0=0, times=self.t, n_states=1, n_theta=2)
116
117        model2_sens_ic = np.array([1, 0, 0])
118
119        np.testing.assert_array_equal(model2_sens_ic, model2._sens_ic)
120
121    def test_sens_ic_vector_1_param(self):
122        # Vector ODE 1 Param
123        def ode_func_3(y, t, p):
124            ds = -p[0] * y[0] * y[1]
125            di = p[0] * y[0] * y[1] - y[1]
126
127            return [ds, di]
128
129        # Instantiate ODE model
130        model3 = DifferentialEquation(func=ode_func_3, t0=0, times=self.t, n_states=2, n_theta=1)
131
132        model3_sens_ic = np.array([1, 0, 0, 0, 1, 0])
133
134        np.testing.assert_array_equal(model3_sens_ic, model3._sens_ic)
135
136    def test_sens_ic_vector_2_param(self):
137        # Vector ODE 2 Param
138        def ode_func_4(y, t, p):
139            ds = -p[0] * y[0] * y[1]
140            di = p[0] * y[0] * y[1] - p[1] * y[1]
141
142            return [ds, di]
143
144        # Instantiate ODE model
145        model4 = DifferentialEquation(func=ode_func_4, t0=0, times=self.t, n_states=2, n_theta=2)
146
147        model4_sens_ic = np.array([1, 0, 0, 0, 0, 1, 0, 0])
148
149        np.testing.assert_array_equal(model4_sens_ic, model4._sens_ic)
150
151    def test_sens_ic_vector_3_params(self):
152        # Big System with Many Parameters
153        def ode_func_5(y, t, p):
154            dx = p[0] * (y[1] - y[0])
155            ds = y[0] * (p[1] - y[2]) - y[1]
156            dz = y[0] * y[1] - p[2] * y[2]
157
158            return [dx, ds, dz]
159
160        # Instantiate ODE model
161        model5 = DifferentialEquation(func=ode_func_5, t0=0, times=self.t, n_states=3, n_theta=3)
162
163        # First three columns are derivatives with respect to ode parameters
164        # Last three coluimns are derivatives with repsect to initial condition
165        # So identity matrix should appear in last 3 columns
166        model5_sens_ic = np.array([[1, 0, 0, 0, 0, 0], [0, 1, 0, 0, 0, 0], [0, 0, 1, 0, 0, 0]])
167
168        np.testing.assert_array_equal(np.ravel(model5_sens_ic), model5._sens_ic)
169
170
171def test_logp_scalar_ode():
172    """Test the computation of the log probability for these models"""
173
174    # Differential equation
175    def system_1(y, t, p):
176        return np.exp(-t) - p[0] * y[0]
177
178    # Parameters and inital condition
179    alpha = 0.4
180    y0 = 0.0
181    times = np.arange(0.5, 8, 0.5)
182
183    yobs = np.array(
184        [0.30, 0.56, 0.51, 0.55, 0.47, 0.42, 0.38, 0.30, 0.26, 0.21, 0.22, 0.13, 0.13, 0.09, 0.09]
185    )[:, np.newaxis]
186
187    ode_model = DifferentialEquation(func=system_1, t0=0, times=times, n_theta=1, n_states=1)
188
189    integrated_solution, *_ = ode_model._simulate([y0], [alpha])
190
191    assert integrated_solution.shape == yobs.shape
192
193    # compare automatic and manual logp values
194    manual_logp = norm.logpdf(x=np.ravel(yobs), loc=np.ravel(integrated_solution), scale=1).sum()
195    with pm.Model() as model_1:
196        forward = ode_model(theta=[alpha], y0=[y0])
197        y = pm.Normal("y", mu=forward, sd=1, observed=yobs)
198    pymc3_logp = model_1.logp()
199
200    np.testing.assert_allclose(manual_logp, pymc3_logp)
201
202
203class TestErrors:
204    """Test running model for a scalar ODE with 1 parameter"""
205
206    def system(y, t, p):
207        return np.exp(-t) - p[0] * y[0]
208
209    times = np.arange(0, 9)
210
211    ode_model = DifferentialEquation(func=system, t0=0, times=times, n_states=1, n_theta=1)
212
213    @pytest.mark.xfail(condition=(theano.config.floatX == "float32"), reason="Fails on float32")
214    def test_too_many_params(self):
215        with pytest.raises(pm.ShapeError):
216            self.ode_model(theta=[1, 1], y0=[0])
217
218    @pytest.mark.xfail(condition=(theano.config.floatX == "float32"), reason="Fails on float32")
219    def test_too_many_y0(self):
220        with pytest.raises(pm.ShapeError):
221            self.ode_model(theta=[1], y0=[0, 0])
222
223    @pytest.mark.xfail(condition=(theano.config.floatX == "float32"), reason="Fails on float32")
224    def test_too_few_params(self):
225        with pytest.raises(pm.ShapeError):
226            self.ode_model(theta=[], y0=[1])
227
228    @pytest.mark.xfail(condition=(theano.config.floatX == "float32"), reason="Fails on float32")
229    def test_too_few_y0(self):
230        with pytest.raises(pm.ShapeError):
231            self.ode_model(theta=[1], y0=[])
232
233    def test_func_callable(self):
234        with pytest.raises(ValueError):
235            DifferentialEquation(func=1, t0=0, times=self.times, n_states=1, n_theta=1)
236
237    def test_number_of_states(self):
238        with pytest.raises(ValueError):
239            DifferentialEquation(func=self.system, t0=0, times=self.times, n_states=0, n_theta=1)
240
241    def test_number_of_params(self):
242        with pytest.raises(ValueError):
243            DifferentialEquation(func=self.system, t0=0, times=self.times, n_states=1, n_theta=0)
244
245
246class TestDiffEqModel:
247    def test_op_equality(self):
248        """Tests that the equality of mathematically identical Ops evaluates True"""
249
250        # Create ODE to test with
251        def ode_func(y, t, p):
252            return np.exp(-t) - p[0] * y[0]
253
254        t = np.linspace(0, 2, 12)
255
256        # Instantiate two Ops
257        op_1 = DifferentialEquation(func=ode_func, t0=0, times=t, n_states=1, n_theta=1)
258        op_2 = DifferentialEquation(func=ode_func, t0=0, times=t, n_states=1, n_theta=1)
259        op_other = DifferentialEquation(
260            func=ode_func, t0=0, times=np.linspace(0, 2, 16), n_states=1, n_theta=1
261        )
262
263        assert op_1 == op_2
264        assert op_1 != op_other
265        return
266
267    def test_scalar_ode_1_param(self):
268        """Test running model for a scalar ODE with 1 parameter"""
269
270        def system(y, t, p):
271            return np.exp(-t) - p[0] * y[0]
272
273        times = np.array(
274            [0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0, 5.5, 6.0, 6.5, 7.0, 7.5]
275        )
276
277        yobs = np.array(
278            [0.31, 0.57, 0.51, 0.55, 0.47, 0.42, 0.38, 0.3, 0.26, 0.22, 0.22, 0.14, 0.14, 0.09, 0.1]
279        )[:, np.newaxis]
280
281        ode_model = DifferentialEquation(func=system, t0=0, times=times, n_states=1, n_theta=1)
282
283        with pm.Model() as model:
284            alpha = pm.HalfCauchy("alpha", 1)
285            y0 = pm.Lognormal("y0", 0, 1)
286            sigma = pm.HalfCauchy("sigma", 1)
287            forward = ode_model(theta=[alpha], y0=[y0])
288            y = pm.Lognormal("y", mu=pm.math.log(forward), sd=sigma, observed=yobs)
289            trace = pm.sample(100, tune=0, chains=1, return_inferencedata=False)
290
291        assert trace["alpha"].size > 0
292        assert trace["y0"].size > 0
293        assert trace["sigma"].size > 0
294
295    def test_scalar_ode_2_param(self):
296        """Test running model for a scalar ODE with 2 parameters"""
297
298        def system(y, t, p):
299            return p[0] * np.exp(-p[0] * t) - p[1] * y[0]
300
301        times = np.array(
302            [0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0, 5.5, 6.0, 6.5, 7.0, 7.5]
303        )
304
305        yobs = np.array(
306            [0.31, 0.57, 0.51, 0.55, 0.47, 0.42, 0.38, 0.3, 0.26, 0.22, 0.22, 0.14, 0.14, 0.09, 0.1]
307        )[:, np.newaxis]
308
309        ode_model = DifferentialEquation(func=system, t0=0, times=times, n_states=1, n_theta=2)
310
311        with pm.Model() as model:
312            alpha = pm.HalfCauchy("alpha", 1)
313            beta = pm.HalfCauchy("beta", 1)
314            y0 = pm.Lognormal("y0", 0, 1)
315            sigma = pm.HalfCauchy("sigma", 1)
316            forward = ode_model(theta=[alpha, beta], y0=[y0])
317            y = pm.Lognormal("y", mu=pm.math.log(forward), sd=sigma, observed=yobs)
318
319            trace = pm.sample(100, tune=0, chains=1, return_inferencedata=False)
320
321        assert trace["alpha"].size > 0
322        assert trace["beta"].size > 0
323        assert trace["y0"].size > 0
324        assert trace["sigma"].size > 0
325
326    def test_vector_ode_1_param(self):
327        """Test running model for a vector ODE with 1 parameter"""
328
329        def system(y, t, p):
330            ds = -p[0] * y[0] * y[1]
331            di = p[0] * y[0] * y[1] - y[1]
332            return [ds, di]
333
334        times = np.array([0.0, 0.8, 1.6, 2.4, 3.2, 4.0, 4.8, 5.6, 6.4, 7.2, 8.0])
335
336        yobs = np.array(
337            [
338                [1.02, 0.02],
339                [0.86, 0.12],
340                [0.43, 0.37],
341                [0.14, 0.42],
342                [0.05, 0.43],
343                [0.03, 0.14],
344                [0.02, 0.08],
345                [0.02, 0.04],
346                [0.02, 0.01],
347                [0.02, 0.01],
348                [0.02, 0.01],
349            ]
350        )
351
352        ode_model = DifferentialEquation(func=system, t0=0, times=times, n_states=2, n_theta=1)
353
354        with pm.Model() as model:
355            R = pm.Lognormal("R", 1, 5)
356            sigma = pm.HalfCauchy("sigma", 1, shape=2)
357            forward = ode_model(theta=[R], y0=[0.99, 0.01])
358            y = pm.Lognormal("y", mu=pm.math.log(forward), sd=sigma, observed=yobs)
359
360            trace = pm.sample(100, tune=0, chains=1, return_inferencedata=False)
361
362        assert trace["R"].size > 0
363        assert trace["sigma"].size > 0
364
365    def test_vector_ode_2_param(self):
366        """Test running model for a vector ODE with 2 parameters"""
367
368        def system(y, t, p):
369            ds = -p[0] * y[0] * y[1]
370            di = p[0] * y[0] * y[1] - p[1] * y[1]
371            return [ds, di]
372
373        times = np.array([0.0, 0.8, 1.6, 2.4, 3.2, 4.0, 4.8, 5.6, 6.4, 7.2, 8.0])
374
375        yobs = np.array(
376            [
377                [1.02, 0.02],
378                [0.86, 0.12],
379                [0.43, 0.37],
380                [0.14, 0.42],
381                [0.05, 0.43],
382                [0.03, 0.14],
383                [0.02, 0.08],
384                [0.02, 0.04],
385                [0.02, 0.01],
386                [0.02, 0.01],
387                [0.02, 0.01],
388            ]
389        )
390
391        ode_model = DifferentialEquation(func=system, t0=0, times=times, n_states=2, n_theta=2)
392
393        with pm.Model() as model:
394            beta = pm.HalfCauchy("beta", 1)
395            gamma = pm.HalfCauchy("gamma", 1)
396            sigma = pm.HalfCauchy("sigma", 1, shape=2)
397            forward = ode_model(theta=[beta, gamma], y0=[0.99, 0.01])
398            y = pm.Lognormal("y", mu=pm.math.log(forward), sd=sigma, observed=yobs)
399
400            trace = pm.sample(100, tune=0, chains=1, return_inferencedata=False)
401
402        assert trace["beta"].size > 0
403        assert trace["gamma"].size > 0
404        assert trace["sigma"].size > 0
405