1from __future__ import absolute_import 2from __future__ import print_function 3import pytest 4import os 5import numpy as np 6from numpy.testing import assert_allclose 7 8from keras import backend as K 9import keras 10from keras.models import Sequential 11from keras.layers import Dense, Activation 12from keras.utils import np_utils 13from keras.utils.test_utils import get_test_data 14from keras.models import model_from_json, model_from_yaml 15from keras import losses 16 17 18input_dim = 16 19num_hidden = 8 20num_classes = 4 21batch_size = 32 22epochs = 1 23 24 25def make_batches(size, batch_size): 26 """Returns a list of batch indices (tuples of indices). 27 # Arguments 28 size: Integer, total size of the data to slice into batches. 29 batch_size: Integer, batch size. 30 # Returns 31 A list of tuples of array indices. 32 """ 33 num_batches = (size + batch_size - 1) // batch_size # round up 34 return [(i * batch_size, min(size, (i + 1) * batch_size)) 35 for i in range(num_batches)] 36 37 38@pytest.fixture 39def in_tmpdir(tmpdir): 40 """Runs a function in a temporary directory. 41 42 Checks that the directory is empty afterwards. 43 """ 44 with tmpdir.as_cwd(): 45 yield None 46 assert not tmpdir.listdir() 47 48 49def test_sequential_pop(): 50 model = Sequential() 51 model.add(Dense(num_hidden, input_dim=input_dim)) 52 model.add(Dense(num_classes)) 53 model.compile(loss='mse', optimizer='sgd') 54 x = np.random.random((batch_size, input_dim)) 55 y = np.random.random((batch_size, num_classes)) 56 model.fit(x, y, epochs=1) 57 model.pop() 58 assert len(model.layers) == 1 59 assert model.output_shape == (None, num_hidden) 60 model.compile(loss='mse', optimizer='sgd') 61 y = np.random.random((batch_size, num_hidden)) 62 model.fit(x, y, epochs=1) 63 64 65def _get_test_data(): 66 np.random.seed(1234) 67 68 train_samples = 100 69 test_samples = 50 70 71 (x_train, y_train), (x_test, y_test) = get_test_data(num_train=train_samples, 72 num_test=test_samples, 73 input_shape=(input_dim,), 74 classification=True, 75 num_classes=num_classes) 76 y_test = np_utils.to_categorical(y_test) 77 y_train = np_utils.to_categorical(y_train) 78 return (x_train, y_train), (x_test, y_test) 79 80 81def test_sequential_fit_generator(): 82 (x_train, y_train), (x_test, y_test) = _get_test_data() 83 84 def data_generator(train): 85 if train: 86 max_batch_index = len(x_train) // batch_size 87 else: 88 max_batch_index = len(x_test) // batch_size 89 i = 0 90 while 1: 91 if train: 92 yield (x_train[i * batch_size: (i + 1) * batch_size], 93 y_train[i * batch_size: (i + 1) * batch_size]) 94 else: 95 yield (x_test[i * batch_size: (i + 1) * batch_size], 96 y_test[i * batch_size: (i + 1) * batch_size]) 97 i += 1 98 i = i % max_batch_index 99 100 model = Sequential() 101 model.add(Dense(num_hidden, input_shape=(input_dim,))) 102 model.add(Activation('relu')) 103 model.add(Dense(num_classes)) 104 model.pop() 105 model.add(Dense(num_classes)) 106 model.add(Activation('softmax')) 107 model.compile(loss='categorical_crossentropy', optimizer='rmsprop') 108 109 model.fit_generator(data_generator(True), 5, epochs) 110 model.fit_generator(data_generator(True), 5, epochs, 111 validation_data=(x_test, y_test)) 112 model.fit_generator(data_generator(True), 5, epochs, 113 validation_data=data_generator(False), 114 validation_steps=3) 115 model.fit_generator(data_generator(True), 5, epochs, max_queue_size=2) 116 model.evaluate(x_train, y_train) 117 118 119def test_sequential(in_tmpdir): 120 (x_train, y_train), (x_test, y_test) = _get_test_data() 121 122 # TODO: factor out 123 def data_generator(x, y, batch_size=50): 124 index_array = np.arange(len(x)) 125 while 1: 126 batches = make_batches(len(x_test), batch_size) 127 for batch_index, (batch_start, batch_end) in enumerate(batches): 128 batch_ids = index_array[batch_start:batch_end] 129 x_batch = x[batch_ids] 130 y_batch = y[batch_ids] 131 yield (x_batch, y_batch) 132 133 model = Sequential() 134 model.add(Dense(num_hidden, input_shape=(input_dim,))) 135 model.add(Activation('relu')) 136 model.add(Dense(num_classes)) 137 model.add(Activation('softmax')) 138 model.compile(loss='categorical_crossentropy', optimizer='rmsprop') 139 140 model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, verbose=1, 141 validation_data=(x_test, y_test)) 142 model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, verbose=2, 143 validation_split=0.1) 144 model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, verbose=0) 145 model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, verbose=1, 146 shuffle=False) 147 148 model.train_on_batch(x_train[:32], y_train[:32]) 149 150 loss_np = model.evaluate(x_test, y_test) 151 predict_np = model.predict(x_test) 152 153 generator_pred_np = model.predict_generator( 154 data_generator(x_test, y_test), 1, 155 max_queue_size=2, verbose=1) 156 generator_loss_np = model.evaluate_generator( 157 data_generator(x_test, y_test, 50), 1, 158 max_queue_size=2) 159 160 assert_allclose(loss_np, generator_loss_np, atol=1e-5) 161 assert_allclose(predict_np, generator_pred_np, atol=1e-5) 162 163 model.predict(x_test, verbose=0) 164 model.predict_classes(x_test, verbose=0) 165 model.predict_proba(x_test, verbose=0) 166 167 fname = 'test_sequential_temp.h5' 168 model.save_weights(fname, overwrite=True) 169 model = Sequential() 170 model.add(Dense(num_hidden, input_shape=(input_dim,))) 171 model.add(Activation('relu')) 172 model.add(Dense(num_classes)) 173 model.add(Activation('softmax')) 174 model.compile(loss='categorical_crossentropy', optimizer='rmsprop') 175 model.load_weights(fname) 176 os.remove(fname) 177 178 nloss = model.evaluate(x_test, y_test, verbose=0) 179 assert(loss_np == nloss) 180 181 # Test serialization 182 config = model.get_config() 183 assert 'name' in config 184 new_model = Sequential.from_config(config) 185 assert new_model.weights # Model should be built. 186 187 model.summary() 188 json_str = model.to_json() 189 model_from_json(json_str) 190 191 yaml_str = model.to_yaml() 192 model_from_yaml(yaml_str) 193 194 195def test_nested_sequential(in_tmpdir): 196 (x_train, y_train), (x_test, y_test) = _get_test_data() 197 198 inner = Sequential() 199 inner.add(Dense(num_hidden, input_shape=(input_dim,))) 200 inner.add(Activation('relu')) 201 inner.add(Dense(num_classes)) 202 203 middle = Sequential() 204 middle.add(inner) 205 206 model = Sequential() 207 model.add(middle) 208 model.add(Activation('softmax')) 209 model.compile(loss='categorical_crossentropy', optimizer='rmsprop') 210 211 model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, verbose=1, 212 validation_data=(x_test, y_test)) 213 model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, verbose=2, 214 validation_split=0.1) 215 model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, verbose=0) 216 model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, verbose=1, 217 shuffle=False) 218 219 model.train_on_batch(x_train[:32], y_train[:32]) 220 221 loss = model.evaluate(x_test, y_test, verbose=0) 222 223 model.predict(x_test, verbose=0) 224 model.predict_classes(x_test, verbose=0) 225 model.predict_proba(x_test, verbose=0) 226 227 fname = 'test_nested_sequential_temp.h5' 228 model.save_weights(fname, overwrite=True) 229 230 inner = Sequential() 231 inner.add(Dense(num_hidden, input_shape=(input_dim,))) 232 inner.add(Activation('relu')) 233 inner.add(Dense(num_classes)) 234 235 middle = Sequential() 236 middle.add(inner) 237 238 model = Sequential() 239 model.add(middle) 240 model.add(Activation('softmax')) 241 model.compile(loss='categorical_crossentropy', optimizer='rmsprop') 242 model.load_weights(fname) 243 os.remove(fname) 244 245 nloss = model.evaluate(x_test, y_test, verbose=0) 246 assert(loss == nloss) 247 248 # Test serialization 249 config = model.get_config() 250 Sequential.from_config(config) 251 252 model.summary() 253 json_str = model.to_json() 254 model_from_json(json_str) 255 256 yaml_str = model.to_yaml() 257 model_from_yaml(yaml_str) 258 259 260def test_sequential_count_params(): 261 input_dim = 20 262 num_units = 10 263 num_classes = 2 264 265 n = input_dim * num_units + num_units 266 n += num_units * num_units + num_units 267 n += num_units * num_classes + num_classes 268 269 model = Sequential() 270 model.add(Dense(num_units, input_shape=(input_dim,))) 271 model.add(Dense(num_units)) 272 model.add(Dense(num_classes)) 273 model.add(Activation('softmax')) 274 model.build() 275 276 assert(n == model.count_params()) 277 278 model.compile('sgd', 'binary_crossentropy') 279 assert(n == model.count_params()) 280 281 282def test_nested_sequential_trainability(): 283 input_dim = 20 284 num_units = 10 285 num_classes = 2 286 287 inner_model = Sequential() 288 inner_model.add(Dense(num_units, input_shape=(input_dim,))) 289 290 model = Sequential() 291 model.add(inner_model) 292 model.add(Dense(num_classes)) 293 294 assert len(model.trainable_weights) == 4 295 inner_model.trainable = False 296 assert len(model.trainable_weights) == 2 297 inner_model.trainable = True 298 assert len(model.trainable_weights) == 4 299 300 301def test_rebuild_model(): 302 model = Sequential() 303 model.add(Dense(128, input_shape=(784,))) 304 model.add(Dense(64)) 305 assert(model.get_layer(index=-1).output_shape == (None, 64)) 306 307 model.add(Dense(32)) 308 assert(model.get_layer(index=-1).output_shape == (None, 32)) 309 310 311def test_clone_functional_model(): 312 val_a = np.random.random((10, 4)) 313 val_b = np.random.random((10, 4)) 314 val_out = np.random.random((10, 4)) 315 316 input_a = keras.Input(shape=(4,)) 317 input_b = keras.Input(shape=(4,)) 318 dense_1 = keras.layers.Dense(4) 319 dense_2 = keras.layers.Dense(4) 320 321 x_a = dense_1(input_a) 322 x_a = keras.layers.Dropout(0.5)(x_a) 323 x_a = keras.layers.BatchNormalization()(x_a) 324 x_b = dense_1(input_b) 325 x_a = dense_2(x_a) 326 outputs = keras.layers.add([x_a, x_b]) 327 model = keras.models.Model([input_a, input_b], outputs) 328 329 if K.backend() == 'tensorflow': 330 # Everything should work in a new session. 331 K.clear_session() 332 333 # With placeholder creation 334 new_model = keras.models.clone_model(model) 335 new_model.compile('rmsprop', 'mse') 336 new_model.train_on_batch([val_a, val_b], val_out) 337 338 # On top of new tensors 339 input_a = keras.Input(shape=(4,), name='a') 340 input_b = keras.Input(shape=(4,), name='b') 341 new_model = keras.models.clone_model( 342 model, input_tensors=[input_a, input_b]) 343 new_model.compile('rmsprop', 'mse') 344 new_model.train_on_batch([val_a, val_b], val_out) 345 346 # # On top of new, non-Keras tensors 347 # input_a = keras.backend.variable(val_a) 348 # input_b = keras.backend.variable(val_b) 349 # new_model = keras.models.clone_model( 350 # model, input_tensors=[input_a, input_b]) 351 # new_model.compile('rmsprop', 'mse') 352 # new_model.train_on_batch(None, val_out) 353 354 355def test_clone_functional_model_with_multi_outputs(): 356 input_layer = keras.Input(shape=(4,)) 357 358 # Layer with single input and multiple outputs 359 layer1 = keras.layers.Lambda(lambda x: [x + 1, x], 360 lambda shapes: [shapes, shapes]) 361 x_a, x_b = layer1(input_layer) 362 363 class SwapLayer(keras.layers.Layer): 364 def call(self, inputs, **kwargs): 365 return [inputs[1], inputs[0]] 366 367 def compute_output_shape(self, input_shape): 368 return [input_shape[1], input_shape[0]] 369 370 # Layer with multiple inputs and outputs 371 x_a, x_b = SwapLayer()([x_a, x_b]) 372 model = keras.Model(inputs=[input_layer], outputs=[x_a, x_b]) 373 new_model = keras.models.clone_model(model) 374 375 x_test = np.random.random((10, 4)) 376 pred_a, pred_b = model.predict(x_test) 377 pred_new_a, pred_new_b = new_model.predict(x_test) 378 assert(pred_a.all() == pred_new_a.all()) 379 assert(pred_b.all() == pred_new_b.all()) 380 381 382def test_clone_sequential_model(): 383 val_a = np.random.random((10, 4)) 384 val_out = np.random.random((10, 4)) 385 386 model = keras.models.Sequential() 387 model.add(keras.layers.Dense(4, input_shape=(4,))) 388 model.add(keras.layers.BatchNormalization()) 389 model.add(keras.layers.Dropout(0.5)) 390 model.add(keras.layers.Dense(4)) 391 392 if K.backend() == 'tensorflow': 393 # Everything should work in a new session. 394 K.clear_session() 395 396 # With placeholder creation 397 new_model = keras.models.clone_model(model) 398 new_model.compile('rmsprop', 'mse') 399 new_model.train_on_batch(val_a, val_out) 400 401 # On top of new tensor 402 input_a = keras.Input(shape=(4,)) 403 new_model = keras.models.clone_model( 404 model, input_tensors=input_a) 405 new_model.compile('rmsprop', 'mse') 406 new_model.train_on_batch(val_a, val_out) 407 408 # # On top of new, non-Keras tensor 409 # input_a = keras.backend.variable(val_a) 410 # new_model = keras.models.clone_model( 411 # model, input_tensors=input_a) 412 # new_model.compile('rmsprop', 'mse') 413 # new_model.train_on_batch(None, val_out) 414 415 416def test_sequential_update_disabling(): 417 val_a = np.random.random((10, 4)) 418 val_out = np.random.random((10, 4)) 419 420 model = keras.models.Sequential() 421 model.add(keras.layers.BatchNormalization(input_shape=(4,))) 422 423 model.trainable = False 424 assert not model.updates 425 426 model.compile('sgd', 'mse') 427 assert not model.updates 428 429 x1 = model.predict(val_a) 430 model.train_on_batch(val_a, val_out) 431 x2 = model.predict(val_a) 432 assert_allclose(x1, x2, atol=1e-7) 433 434 model.trainable = True 435 model.compile('sgd', 'mse') 436 assert model.updates 437 438 model.train_on_batch(val_a, val_out) 439 x2 = model.predict(val_a) 440 assert np.abs(np.sum(x1 - x2)) > 1e-5 441 442 443def test_sequential_deferred_build(): 444 model = keras.models.Sequential() 445 model.add(keras.layers.Dense(3)) 446 model.add(keras.layers.Dense(3)) 447 model.compile('sgd', 'mse') 448 449 assert model.built is False 450 assert len(model.layers) == 2 451 452 model.train_on_batch( 453 np.random.random((2, 4)), np.random.random((2, 3))) 454 455 assert model.built is True 456 assert len(model.layers) == 2 457 assert len(model.weights) == 4 458 459 # Test serialization 460 config = model.get_config() 461 assert 'name' in config 462 new_model = Sequential.from_config(config) 463 assert new_model.built is True 464 assert len(new_model.layers) == 2 465 assert len(new_model.weights) == 4 466 467 468def test_nested_sequential_deferred_build(): 469 inner_model = keras.models.Sequential() 470 inner_model.add(keras.layers.Dense(3)) 471 inner_model.add(keras.layers.Dense(3)) 472 473 model = keras.models.Sequential() 474 model.add(inner_model) 475 model.add(keras.layers.Dense(5)) 476 model.compile('sgd', 'mse') 477 478 assert inner_model.built is False 479 assert len(inner_model.layers) == 2 480 assert model.built is False 481 assert len(model.layers) == 2 482 483 model.train_on_batch( 484 np.random.random((2, 4)), np.random.random((2, 5))) 485 486 assert inner_model.built is True 487 assert len(inner_model.layers) == 2 488 assert len(inner_model.weights) == 4 489 assert model.built is True 490 assert len(model.layers) == 2 491 assert len(model.weights) == 6 492 493 config = model.get_config() 494 new_model = keras.models.Sequential.from_config(config) 495 assert new_model.built is True 496 assert len(new_model.layers) == 2 497 assert len(new_model.weights) == 6 498 499 new_inner_model = new_model.layers[0] 500 assert new_inner_model.built is True 501 assert len(new_inner_model.layers) == 2 502 assert len(new_inner_model.weights) == 4 503 504 505if __name__ == '__main__': 506 pytest.main([__file__]) 507