1import pytest
2import numpy as np
3from numpy.testing import assert_allclose
4
5from keras.layers import Input
6from keras import regularizers
7from keras.utils.test_utils import layer_test
8from keras.layers import normalization
9from keras.models import Sequential, Model
10from keras import backend as K
11
12input_1 = np.arange(10)
13input_2 = np.zeros(10)
14input_3 = np.ones((10))
15input_4 = np.expand_dims(np.arange(10.), axis=1)
16input_shapes = [np.ones((10, 10)), np.ones((10, 10, 10))]
17
18
19def test_basic_batchnorm():
20    layer_test(normalization.BatchNormalization,
21               kwargs={'momentum': 0.9,
22                       'epsilon': 0.1,
23                       'gamma_regularizer': regularizers.l2(0.01),
24                       'beta_regularizer': regularizers.l2(0.01)},
25               input_shape=(3, 4, 2))
26    layer_test(normalization.BatchNormalization,
27               kwargs={'momentum': 0.9,
28                       'epsilon': 0.1,
29                       'axis': 1},
30               input_shape=(1, 4, 1))
31    layer_test(normalization.BatchNormalization,
32               kwargs={'gamma_initializer': 'ones',
33                       'beta_initializer': 'ones',
34                       'moving_mean_initializer': 'zeros',
35                       'moving_variance_initializer': 'ones'},
36               input_shape=(3, 4, 2, 4))
37    if K.backend() != 'theano':
38        layer_test(normalization.BatchNormalization,
39                   kwargs={'momentum': 0.9,
40                           'epsilon': 0.1,
41                           'axis': 1,
42                           'scale': False,
43                           'center': False},
44                   input_shape=(3, 4, 2, 4))
45
46
47def test_batchnorm_correctness_1d():
48    np.random.seed(1337)
49    model = Sequential()
50    norm = normalization.BatchNormalization(input_shape=(10,), momentum=0.8)
51    model.add(norm)
52    model.compile(loss='mse', optimizer='rmsprop')
53
54    # centered on 5.0, variance 10.0
55    x = np.random.normal(loc=5.0, scale=10.0, size=(1000, 10))
56    model.fit(x, x, epochs=5, verbose=0)
57    out = model.predict(x)
58    out -= K.eval(norm.beta)
59    out /= K.eval(norm.gamma)
60
61    assert_allclose(out.mean(), 0.0, atol=1e-1)
62    assert_allclose(out.std(), 1.0, atol=1e-1)
63
64
65def test_batchnorm_correctness_2d():
66    np.random.seed(1337)
67    model = Sequential()
68    norm = normalization.BatchNormalization(axis=1, input_shape=(10, 6),
69                                            momentum=0.8)
70    model.add(norm)
71    model.compile(loss='mse', optimizer='rmsprop')
72
73    # centered on 5.0, variance 10.0
74    x = np.random.normal(loc=5.0, scale=10.0, size=(1000, 10, 6))
75    model.fit(x, x, epochs=5, verbose=0)
76    out = model.predict(x)
77    out -= np.reshape(K.eval(norm.beta), (1, 10, 1))
78    out /= np.reshape(K.eval(norm.gamma), (1, 10, 1))
79
80    assert_allclose(out.mean(axis=(0, 2)), 0.0, atol=1.1e-1)
81    assert_allclose(out.std(axis=(0, 2)), 1.0, atol=1.1e-1)
82
83
84def test_batchnorm_training_argument():
85    np.random.seed(1337)
86    bn1 = normalization.BatchNormalization(input_shape=(10,))
87    x1 = Input(shape=(10,))
88    y1 = bn1(x1, training=True)
89    assert bn1.updates
90
91    model1 = Model(x1, y1)
92    x = np.random.normal(loc=5.0, scale=10.0, size=(20, 10))
93    output_a = model1.predict(x)
94
95    model1.compile(loss='mse', optimizer='rmsprop')
96    model1.fit(x, x, epochs=1, verbose=0)
97    output_b = model1.predict(x)
98    assert np.abs(np.sum(output_a - output_b)) > 0.1
99    assert_allclose(output_b.mean(), 0.0, atol=1e-1)
100    assert_allclose(output_b.std(), 1.0, atol=1e-1)
101
102    bn2 = normalization.BatchNormalization(input_shape=(10,))
103    x2 = Input(shape=(10,))
104    bn2(x2, training=False)
105    assert not bn2.updates
106
107
108def test_batchnorm_mode_twice():
109    # This is a regression test for issue #4881 with the old
110    # batch normalization functions in the Theano backend.
111    model = Sequential()
112    model.add(normalization.BatchNormalization(input_shape=(10, 5, 5), axis=1))
113    model.add(normalization.BatchNormalization(input_shape=(10, 5, 5), axis=1))
114    model.compile(loss='mse', optimizer='sgd')
115
116    x = np.random.normal(loc=5.0, scale=10.0, size=(20, 10, 5, 5))
117    model.fit(x, x, epochs=1, verbose=0)
118    model.predict(x)
119
120
121def test_batchnorm_convnet():
122    np.random.seed(1337)
123    model = Sequential()
124    norm = normalization.BatchNormalization(axis=1, input_shape=(3, 4, 4),
125                                            momentum=0.8)
126    model.add(norm)
127    model.compile(loss='mse', optimizer='sgd')
128
129    # centered on 5.0, variance 10.0
130    x = np.random.normal(loc=5.0, scale=10.0, size=(1000, 3, 4, 4))
131    model.fit(x, x, epochs=4, verbose=0)
132    out = model.predict(x)
133    out -= np.reshape(K.eval(norm.beta), (1, 3, 1, 1))
134    out /= np.reshape(K.eval(norm.gamma), (1, 3, 1, 1))
135
136    assert_allclose(np.mean(out, axis=(0, 2, 3)), 0.0, atol=1e-1)
137    assert_allclose(np.std(out, axis=(0, 2, 3)), 1.0, atol=1e-1)
138
139
140@pytest.mark.skipif((K.backend() == 'theano'),
141                    reason='Bug with theano backend')
142def test_batchnorm_convnet_no_center_no_scale():
143    np.random.seed(1337)
144    model = Sequential()
145    norm = normalization.BatchNormalization(axis=-1, center=False, scale=False,
146                                            input_shape=(3, 4, 4), momentum=0.8)
147    model.add(norm)
148    model.compile(loss='mse', optimizer='sgd')
149
150    # centered on 5.0, variance 10.0
151    x = np.random.normal(loc=5.0, scale=10.0, size=(1000, 3, 4, 4))
152    model.fit(x, x, epochs=4, verbose=0)
153    out = model.predict(x)
154
155    assert_allclose(np.mean(out, axis=(0, 2, 3)), 0.0, atol=1e-1)
156    assert_allclose(np.std(out, axis=(0, 2, 3)), 1.0, atol=1e-1)
157
158
159def test_shared_batchnorm():
160    '''Test that a BN layer can be shared
161    across different data streams.
162    '''
163    # Test single layer reuse
164    bn = normalization.BatchNormalization(input_shape=(10,))
165    x1 = Input(shape=(10,))
166    bn(x1)
167
168    x2 = Input(shape=(10,))
169    y2 = bn(x2)
170
171    x = np.random.normal(loc=5.0, scale=10.0, size=(2, 10))
172    model = Model(x2, y2)
173    model.compile('sgd', 'mse')
174    model.train_on_batch(x, x)
175
176    # Test model-level reuse
177    x3 = Input(shape=(10,))
178    y3 = model(x3)
179    new_model = Model(x3, y3)
180    new_model.compile('sgd', 'mse')
181    new_model.train_on_batch(x, x)
182
183
184def test_that_trainable_disables_updates():
185    val_a = np.random.random((10, 4))
186    val_out = np.random.random((10, 4))
187
188    a = Input(shape=(4,))
189    layer = normalization.BatchNormalization(input_shape=(4,))
190    b = layer(a)
191    model = Model(a, b)
192
193    model.trainable = False
194    assert not model.updates
195
196    model.compile('sgd', 'mse')
197    assert not model.updates
198
199    x1 = model.predict(val_a)
200    model.train_on_batch(val_a, val_out)
201    x2 = model.predict(val_a)
202    assert_allclose(x1, x2, atol=1e-7)
203
204    model.trainable = True
205    model.compile('sgd', 'mse')
206    assert model.updates
207
208    model.train_on_batch(val_a, val_out)
209    x2 = model.predict(val_a)
210    assert np.abs(np.sum(x1 - x2)) > 1e-5
211
212    layer.trainable = False
213    model.compile('sgd', 'mse')
214    assert not model.updates
215
216    x1 = model.predict(val_a)
217    model.train_on_batch(val_a, val_out)
218    x2 = model.predict(val_a)
219    assert_allclose(x1, x2, atol=1e-7)
220
221
222def test_batchnorm_trainable():
223    bn_mean = 0.5
224    bn_std = 10.
225
226    def get_model(bn_mean, bn_std):
227        input = Input(shape=(1,))
228        x = normalization.BatchNormalization()(input)
229        model = Model(input, x)
230        model.set_weights([np.array([1.]), np.array([0.]),
231                           np.array([bn_mean]), np.array([bn_std ** 2])])
232        return model
233    # Simulates training-mode with trainable layer. Should use mini-batch statistics.
234    model = get_model(bn_mean, bn_std)
235    model.compile(loss='mse', optimizer='rmsprop')
236    out = model(input_4, training=True).numpy()
237    assert_allclose((input_4 - np.mean(input_4)) / np.std(input_4), out, atol=1e-3)
238
239
240if __name__ == '__main__':
241    pytest.main([__file__])
242