1import pytest 2import numpy as np 3from numpy.testing import assert_allclose 4 5from keras.layers import Input 6from keras import regularizers 7from keras.utils.test_utils import layer_test 8from keras.layers import normalization 9from keras.models import Sequential, Model 10from keras import backend as K 11 12input_1 = np.arange(10) 13input_2 = np.zeros(10) 14input_3 = np.ones((10)) 15input_4 = np.expand_dims(np.arange(10.), axis=1) 16input_shapes = [np.ones((10, 10)), np.ones((10, 10, 10))] 17 18 19def test_basic_batchnorm(): 20 layer_test(normalization.BatchNormalization, 21 kwargs={'momentum': 0.9, 22 'epsilon': 0.1, 23 'gamma_regularizer': regularizers.l2(0.01), 24 'beta_regularizer': regularizers.l2(0.01)}, 25 input_shape=(3, 4, 2)) 26 layer_test(normalization.BatchNormalization, 27 kwargs={'momentum': 0.9, 28 'epsilon': 0.1, 29 'axis': 1}, 30 input_shape=(1, 4, 1)) 31 layer_test(normalization.BatchNormalization, 32 kwargs={'gamma_initializer': 'ones', 33 'beta_initializer': 'ones', 34 'moving_mean_initializer': 'zeros', 35 'moving_variance_initializer': 'ones'}, 36 input_shape=(3, 4, 2, 4)) 37 if K.backend() != 'theano': 38 layer_test(normalization.BatchNormalization, 39 kwargs={'momentum': 0.9, 40 'epsilon': 0.1, 41 'axis': 1, 42 'scale': False, 43 'center': False}, 44 input_shape=(3, 4, 2, 4)) 45 46 47def test_batchnorm_correctness_1d(): 48 np.random.seed(1337) 49 model = Sequential() 50 norm = normalization.BatchNormalization(input_shape=(10,), momentum=0.8) 51 model.add(norm) 52 model.compile(loss='mse', optimizer='rmsprop') 53 54 # centered on 5.0, variance 10.0 55 x = np.random.normal(loc=5.0, scale=10.0, size=(1000, 10)) 56 model.fit(x, x, epochs=5, verbose=0) 57 out = model.predict(x) 58 out -= K.eval(norm.beta) 59 out /= K.eval(norm.gamma) 60 61 assert_allclose(out.mean(), 0.0, atol=1e-1) 62 assert_allclose(out.std(), 1.0, atol=1e-1) 63 64 65def test_batchnorm_correctness_2d(): 66 np.random.seed(1337) 67 model = Sequential() 68 norm = normalization.BatchNormalization(axis=1, input_shape=(10, 6), 69 momentum=0.8) 70 model.add(norm) 71 model.compile(loss='mse', optimizer='rmsprop') 72 73 # centered on 5.0, variance 10.0 74 x = np.random.normal(loc=5.0, scale=10.0, size=(1000, 10, 6)) 75 model.fit(x, x, epochs=5, verbose=0) 76 out = model.predict(x) 77 out -= np.reshape(K.eval(norm.beta), (1, 10, 1)) 78 out /= np.reshape(K.eval(norm.gamma), (1, 10, 1)) 79 80 assert_allclose(out.mean(axis=(0, 2)), 0.0, atol=1.1e-1) 81 assert_allclose(out.std(axis=(0, 2)), 1.0, atol=1.1e-1) 82 83 84def test_batchnorm_training_argument(): 85 np.random.seed(1337) 86 bn1 = normalization.BatchNormalization(input_shape=(10,)) 87 x1 = Input(shape=(10,)) 88 y1 = bn1(x1, training=True) 89 assert bn1.updates 90 91 model1 = Model(x1, y1) 92 x = np.random.normal(loc=5.0, scale=10.0, size=(20, 10)) 93 output_a = model1.predict(x) 94 95 model1.compile(loss='mse', optimizer='rmsprop') 96 model1.fit(x, x, epochs=1, verbose=0) 97 output_b = model1.predict(x) 98 assert np.abs(np.sum(output_a - output_b)) > 0.1 99 assert_allclose(output_b.mean(), 0.0, atol=1e-1) 100 assert_allclose(output_b.std(), 1.0, atol=1e-1) 101 102 bn2 = normalization.BatchNormalization(input_shape=(10,)) 103 x2 = Input(shape=(10,)) 104 bn2(x2, training=False) 105 assert not bn2.updates 106 107 108def test_batchnorm_mode_twice(): 109 # This is a regression test for issue #4881 with the old 110 # batch normalization functions in the Theano backend. 111 model = Sequential() 112 model.add(normalization.BatchNormalization(input_shape=(10, 5, 5), axis=1)) 113 model.add(normalization.BatchNormalization(input_shape=(10, 5, 5), axis=1)) 114 model.compile(loss='mse', optimizer='sgd') 115 116 x = np.random.normal(loc=5.0, scale=10.0, size=(20, 10, 5, 5)) 117 model.fit(x, x, epochs=1, verbose=0) 118 model.predict(x) 119 120 121def test_batchnorm_convnet(): 122 np.random.seed(1337) 123 model = Sequential() 124 norm = normalization.BatchNormalization(axis=1, input_shape=(3, 4, 4), 125 momentum=0.8) 126 model.add(norm) 127 model.compile(loss='mse', optimizer='sgd') 128 129 # centered on 5.0, variance 10.0 130 x = np.random.normal(loc=5.0, scale=10.0, size=(1000, 3, 4, 4)) 131 model.fit(x, x, epochs=4, verbose=0) 132 out = model.predict(x) 133 out -= np.reshape(K.eval(norm.beta), (1, 3, 1, 1)) 134 out /= np.reshape(K.eval(norm.gamma), (1, 3, 1, 1)) 135 136 assert_allclose(np.mean(out, axis=(0, 2, 3)), 0.0, atol=1e-1) 137 assert_allclose(np.std(out, axis=(0, 2, 3)), 1.0, atol=1e-1) 138 139 140@pytest.mark.skipif((K.backend() == 'theano'), 141 reason='Bug with theano backend') 142def test_batchnorm_convnet_no_center_no_scale(): 143 np.random.seed(1337) 144 model = Sequential() 145 norm = normalization.BatchNormalization(axis=-1, center=False, scale=False, 146 input_shape=(3, 4, 4), momentum=0.8) 147 model.add(norm) 148 model.compile(loss='mse', optimizer='sgd') 149 150 # centered on 5.0, variance 10.0 151 x = np.random.normal(loc=5.0, scale=10.0, size=(1000, 3, 4, 4)) 152 model.fit(x, x, epochs=4, verbose=0) 153 out = model.predict(x) 154 155 assert_allclose(np.mean(out, axis=(0, 2, 3)), 0.0, atol=1e-1) 156 assert_allclose(np.std(out, axis=(0, 2, 3)), 1.0, atol=1e-1) 157 158 159def test_shared_batchnorm(): 160 '''Test that a BN layer can be shared 161 across different data streams. 162 ''' 163 # Test single layer reuse 164 bn = normalization.BatchNormalization(input_shape=(10,)) 165 x1 = Input(shape=(10,)) 166 bn(x1) 167 168 x2 = Input(shape=(10,)) 169 y2 = bn(x2) 170 171 x = np.random.normal(loc=5.0, scale=10.0, size=(2, 10)) 172 model = Model(x2, y2) 173 model.compile('sgd', 'mse') 174 model.train_on_batch(x, x) 175 176 # Test model-level reuse 177 x3 = Input(shape=(10,)) 178 y3 = model(x3) 179 new_model = Model(x3, y3) 180 new_model.compile('sgd', 'mse') 181 new_model.train_on_batch(x, x) 182 183 184def test_that_trainable_disables_updates(): 185 val_a = np.random.random((10, 4)) 186 val_out = np.random.random((10, 4)) 187 188 a = Input(shape=(4,)) 189 layer = normalization.BatchNormalization(input_shape=(4,)) 190 b = layer(a) 191 model = Model(a, b) 192 193 model.trainable = False 194 assert not model.updates 195 196 model.compile('sgd', 'mse') 197 assert not model.updates 198 199 x1 = model.predict(val_a) 200 model.train_on_batch(val_a, val_out) 201 x2 = model.predict(val_a) 202 assert_allclose(x1, x2, atol=1e-7) 203 204 model.trainable = True 205 model.compile('sgd', 'mse') 206 assert model.updates 207 208 model.train_on_batch(val_a, val_out) 209 x2 = model.predict(val_a) 210 assert np.abs(np.sum(x1 - x2)) > 1e-5 211 212 layer.trainable = False 213 model.compile('sgd', 'mse') 214 assert not model.updates 215 216 x1 = model.predict(val_a) 217 model.train_on_batch(val_a, val_out) 218 x2 = model.predict(val_a) 219 assert_allclose(x1, x2, atol=1e-7) 220 221 222def test_batchnorm_trainable(): 223 bn_mean = 0.5 224 bn_std = 10. 225 226 def get_model(bn_mean, bn_std): 227 input = Input(shape=(1,)) 228 x = normalization.BatchNormalization()(input) 229 model = Model(input, x) 230 model.set_weights([np.array([1.]), np.array([0.]), 231 np.array([bn_mean]), np.array([bn_std ** 2])]) 232 return model 233 # Simulates training-mode with trainable layer. Should use mini-batch statistics. 234 model = get_model(bn_mean, bn_std) 235 model.compile(loss='mse', optimizer='rmsprop') 236 out = model(input_4, training=True).numpy() 237 assert_allclose((input_4 - np.mean(input_4)) / np.std(input_4), out, atol=1e-3) 238 239 240if __name__ == '__main__': 241 pytest.main([__file__]) 242