1#   Copyright 2020 The PyMC Developers
2#
3#   Licensed under the Apache License, Version 2.0 (the "License");
4#   you may not use this file except in compliance with the License.
5#   You may obtain a copy of the License at
6#
7#       http://www.apache.org/licenses/LICENSE-2.0
8#
9#   Unless required by applicable law or agreed to in writing, software
10#   distributed under the License is distributed on an "AS IS" BASIS,
11#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12#   See the License for the specific language governing permissions and
13#   limitations under the License.
14
15import numpy as np
16import pytest
17import theano.tensor as tt
18
19import pymc3 as pm
20
21from pymc3.tests.helpers import SeededTest
22
23
24class TestSMC(SeededTest):
25    def setup_class(self):
26        super().setup_class()
27        self.samples = 1000
28        n = 4
29        mu1 = np.ones(n) * (1.0 / 2)
30        mu2 = -mu1
31
32        stdev = 0.1
33        sigma = np.power(stdev, 2) * np.eye(n)
34        isigma = np.linalg.inv(sigma)
35        dsigma = np.linalg.det(sigma)
36
37        w1 = stdev
38        w2 = 1 - stdev
39
40        def two_gaussians(x):
41            log_like1 = (
42                -0.5 * n * tt.log(2 * np.pi)
43                - 0.5 * tt.log(dsigma)
44                - 0.5 * (x - mu1).T.dot(isigma).dot(x - mu1)
45            )
46            log_like2 = (
47                -0.5 * n * tt.log(2 * np.pi)
48                - 0.5 * tt.log(dsigma)
49                - 0.5 * (x - mu2).T.dot(isigma).dot(x - mu2)
50            )
51            return tt.log(w1 * tt.exp(log_like1) + w2 * tt.exp(log_like2))
52
53        with pm.Model() as self.SMC_test:
54            X = pm.Uniform("X", lower=-2, upper=2.0, shape=n)
55            llk = pm.Potential("muh", two_gaussians(X))
56
57        self.muref = mu1
58
59    def test_sample(self):
60        with self.SMC_test:
61            mtrace = pm.sample_smc(draws=self.samples)
62
63        x = mtrace["X"]
64        mu1d = np.abs(x).mean(axis=0)
65        np.testing.assert_allclose(self.muref, mu1d, rtol=0.0, atol=0.03)
66
67    def test_discrete_continuous(self):
68        with pm.Model() as model:
69            a = pm.Poisson("a", 5)
70            b = pm.HalfNormal("b", 10)
71            y = pm.Normal("y", a, b, observed=[1, 2, 3, 4])
72            trace = pm.sample_smc()
73
74    def test_ml(self):
75        data = np.repeat([1, 0], [50, 50])
76        marginals = []
77        a_prior_0, b_prior_0 = 1.0, 1.0
78        a_prior_1, b_prior_1 = 20.0, 20.0
79
80        for alpha, beta in ((a_prior_0, b_prior_0), (a_prior_1, b_prior_1)):
81            with pm.Model() as model:
82                a = pm.Beta("a", alpha, beta)
83                y = pm.Bernoulli("y", a, observed=data)
84                trace = pm.sample_smc(2000)
85                marginals.append(trace.report.log_marginal_likelihood)
86        # compare to the analytical result
87        assert abs(np.exp(np.mean(marginals[1]) - np.mean(marginals[0])) - 4.0) <= 1
88
89    def test_start(self):
90        with pm.Model() as model:
91            a = pm.Poisson("a", 5)
92            b = pm.HalfNormal("b", 10)
93            y = pm.Normal("y", a, b, observed=[1, 2, 3, 4])
94            start = {
95                "a": np.random.poisson(5, size=500),
96                "b_log__": np.abs(np.random.normal(0, 10, size=500)),
97            }
98            trace = pm.sample_smc(500, start=start)
99
100
101class TestSMCABC(SeededTest):
102    def setup_class(self):
103        super().setup_class()
104        self.data = np.random.normal(loc=0, scale=1, size=1000)
105
106        def normal_sim(a, b):
107            return np.random.normal(a, b, 1000)
108
109        with pm.Model() as self.SMABC_test:
110            a = pm.Normal("a", mu=0, sigma=1)
111            b = pm.HalfNormal("b", sigma=1)
112            s = pm.Simulator(
113                "s", normal_sim, params=(a, b), sum_stat="sort", epsilon=1, observed=self.data
114            )
115            self.s = s
116
117        def quantiles(x):
118            return np.quantile(x, [0.25, 0.5, 0.75])
119
120        def abs_diff(eps, obs_data, sim_data):
121            return np.mean(np.abs((obs_data - sim_data) / eps))
122
123        with pm.Model() as self.SMABC_test2:
124            a = pm.Normal("a", mu=0, sigma=1)
125            b = pm.HalfNormal("b", sigma=1)
126            s = pm.Simulator(
127                "s",
128                normal_sim,
129                params=(a, b),
130                distance=abs_diff,
131                sum_stat=quantiles,
132                epsilon=1,
133                observed=self.data,
134            )
135
136        with pm.Model() as self.SMABC_potential:
137            a = pm.Normal("a", mu=0, sigma=1)
138            b = pm.HalfNormal("b", sigma=1)
139            c = pm.Potential("c", pm.math.switch(a > 0, 0, -np.inf))
140            s = pm.Simulator(
141                "s", normal_sim, params=(a, b), sum_stat="sort", epsilon=1, observed=self.data
142            )
143
144        with pm.Model() as self.SMABC_log_pseudolikelihood:
145            a = pm.Normal("a", mu=0, sigma=1)
146            b = pm.HalfNormal("b", sigma=10)
147            s = pm.Simulator(
148                "s", normal_sim, params=(a, b), sum_stat="mean", epsilon=0.2, observed=self.data
149            )
150
151    def test_one_gaussian(self):
152        with self.SMABC_test:
153            trace = pm.sample_smc(draws=1000, kernel="ABC")
154
155        np.testing.assert_almost_equal(self.data.mean(), trace["a"].mean(), decimal=2)
156        np.testing.assert_almost_equal(self.data.std(), trace["b"].mean(), decimal=1)
157
158    def test_sim_data_ppc(self):
159        with self.SMABC_test:
160            trace, sim_data = pm.sample_smc(draws=1000, kernel="ABC", chains=2, save_sim_data=True)
161            pr_p = pm.sample_prior_predictive(1000)
162            po_p = pm.sample_posterior_predictive(trace, 1000)
163
164        assert sim_data["s"].shape == (2, 1000, 1000)
165        np.testing.assert_almost_equal(self.data.mean(), sim_data["s"].mean(), decimal=2)
166        np.testing.assert_almost_equal(self.data.std(), sim_data["s"].std(), decimal=1)
167        assert pr_p["s"].shape == (1000, 1000)
168        np.testing.assert_almost_equal(0, pr_p["s"].mean(), decimal=1)
169        np.testing.assert_almost_equal(1.4, pr_p["s"].std(), decimal=1)
170        assert po_p["s"].shape == (1000, 1000)
171        np.testing.assert_almost_equal(0, po_p["s"].mean(), decimal=2)
172        np.testing.assert_almost_equal(1, po_p["s"].std(), decimal=1)
173
174    def test_custom_dist_sum(self):
175        with self.SMABC_test2:
176            trace = pm.sample_smc(draws=1000, kernel="ABC")
177
178    def test_potential(self):
179        with self.SMABC_potential:
180            trace = pm.sample_smc(draws=1000, kernel="ABC")
181            assert np.all(trace["a"] >= 0)
182
183    def test_automatic_use_of_sort(self):
184        with pm.Model() as model:
185            s_k = pm.Simulator(
186                "s_k",
187                None,
188                params=None,
189                distance="kullback_leibler",
190                sum_stat="sort",
191                observed=self.data,
192            )
193        assert s_k.distribution.sum_stat is pm.distributions.simulator.identity
194
195    def test_repr_latex(self):
196        expected = "$\\text{s} \\sim  \\text{Simulator}(\\text{normal_sim}(a, b), \\text{gaussian}, \\text{sort})$"
197        assert expected == self.s._repr_latex_()
198        assert self.s._repr_latex_() == self.s.__latex__()
199        assert self.SMABC_test.model._repr_latex_() == self.SMABC_test.model.__latex__()
200
201    def test_name_is_string_type(self):
202        with self.SMABC_potential:
203            assert not self.SMABC_potential.name
204            trace = pm.sample_smc(draws=10, kernel="ABC")
205            assert isinstance(trace._straces[0].name, str)
206
207    def test_named_models_are_unsupported(self):
208        def normal_sim(a, b):
209            return np.random.normal(a, b, 1000)
210
211        with pm.Model(name="NamedModel"):
212            a = pm.Normal("a", mu=0, sigma=1)
213            b = pm.HalfNormal("b", sigma=1)
214            c = pm.Potential("c", pm.math.switch(a > 0, 0, -np.inf))
215            s = pm.Simulator(
216                "s", normal_sim, params=(a, b), sum_stat="sort", epsilon=1, observed=self.data
217            )
218            with pytest.raises(NotImplementedError, match="named models"):
219                pm.sample_smc(draws=10, kernel="ABC")
220
221    def test_log_pseudolikelihood(self):
222        with self.SMABC_log_pseudolikelihood:
223            trace = pm.sample_smc(draws=1000, chains=2, kernel="ABC")
224            assert trace.report.log_pseudolikelihood[0].var() < 1
225            assert trace.report.log_pseudolikelihood[1].var() < 1
226