1from __future__ import absolute_import
2from __future__ import print_function
3import pytest
4import os
5import numpy as np
6from numpy.testing import assert_allclose
7
8from keras import backend as K
9import keras
10from keras.models import Sequential
11from keras.layers import Dense, Activation
12from keras.utils import np_utils
13from keras.utils.test_utils import get_test_data
14from keras.models import model_from_json, model_from_yaml
15from keras import losses
16
17
18input_dim = 16
19num_hidden = 8
20num_classes = 4
21batch_size = 32
22epochs = 1
23
24
25def make_batches(size, batch_size):
26    """Returns a list of batch indices (tuples of indices).
27    # Arguments
28        size: Integer, total size of the data to slice into batches.
29        batch_size: Integer, batch size.
30    # Returns
31        A list of tuples of array indices.
32    """
33    num_batches = (size + batch_size - 1) // batch_size  # round up
34    return [(i * batch_size, min(size, (i + 1) * batch_size))
35            for i in range(num_batches)]
36
37
38@pytest.fixture
39def in_tmpdir(tmpdir):
40    """Runs a function in a temporary directory.
41
42    Checks that the directory is empty afterwards.
43    """
44    with tmpdir.as_cwd():
45        yield None
46    assert not tmpdir.listdir()
47
48
49def test_sequential_pop():
50    model = Sequential()
51    model.add(Dense(num_hidden, input_dim=input_dim))
52    model.add(Dense(num_classes))
53    model.compile(loss='mse', optimizer='sgd')
54    x = np.random.random((batch_size, input_dim))
55    y = np.random.random((batch_size, num_classes))
56    model.fit(x, y, epochs=1)
57    model.pop()
58    assert len(model.layers) == 1
59    assert model.output_shape == (None, num_hidden)
60    model.compile(loss='mse', optimizer='sgd')
61    y = np.random.random((batch_size, num_hidden))
62    model.fit(x, y, epochs=1)
63
64
65def _get_test_data():
66    np.random.seed(1234)
67
68    train_samples = 100
69    test_samples = 50
70
71    (x_train, y_train), (x_test, y_test) = get_test_data(num_train=train_samples,
72                                                         num_test=test_samples,
73                                                         input_shape=(input_dim,),
74                                                         classification=True,
75                                                         num_classes=num_classes)
76    y_test = np_utils.to_categorical(y_test)
77    y_train = np_utils.to_categorical(y_train)
78    return (x_train, y_train), (x_test, y_test)
79
80
81def test_sequential_fit_generator():
82    (x_train, y_train), (x_test, y_test) = _get_test_data()
83
84    def data_generator(train):
85        if train:
86            max_batch_index = len(x_train) // batch_size
87        else:
88            max_batch_index = len(x_test) // batch_size
89        i = 0
90        while 1:
91            if train:
92                yield (x_train[i * batch_size: (i + 1) * batch_size],
93                       y_train[i * batch_size: (i + 1) * batch_size])
94            else:
95                yield (x_test[i * batch_size: (i + 1) * batch_size],
96                       y_test[i * batch_size: (i + 1) * batch_size])
97            i += 1
98            i = i % max_batch_index
99
100    model = Sequential()
101    model.add(Dense(num_hidden, input_shape=(input_dim,)))
102    model.add(Activation('relu'))
103    model.add(Dense(num_classes))
104    model.pop()
105    model.add(Dense(num_classes))
106    model.add(Activation('softmax'))
107    model.compile(loss='categorical_crossentropy', optimizer='rmsprop')
108
109    model.fit_generator(data_generator(True), 5, epochs)
110    model.fit_generator(data_generator(True), 5, epochs,
111                        validation_data=(x_test, y_test))
112    model.fit_generator(data_generator(True), 5, epochs,
113                        validation_data=data_generator(False),
114                        validation_steps=3)
115    model.fit_generator(data_generator(True), 5, epochs, max_queue_size=2)
116    model.evaluate(x_train, y_train)
117
118
119def test_sequential(in_tmpdir):
120    (x_train, y_train), (x_test, y_test) = _get_test_data()
121
122    # TODO: factor out
123    def data_generator(x, y, batch_size=50):
124        index_array = np.arange(len(x))
125        while 1:
126            batches = make_batches(len(x_test), batch_size)
127            for batch_index, (batch_start, batch_end) in enumerate(batches):
128                batch_ids = index_array[batch_start:batch_end]
129                x_batch = x[batch_ids]
130                y_batch = y[batch_ids]
131                yield (x_batch, y_batch)
132
133    model = Sequential()
134    model.add(Dense(num_hidden, input_shape=(input_dim,)))
135    model.add(Activation('relu'))
136    model.add(Dense(num_classes))
137    model.add(Activation('softmax'))
138    model.compile(loss='categorical_crossentropy', optimizer='rmsprop')
139
140    model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, verbose=1,
141              validation_data=(x_test, y_test))
142    model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, verbose=2,
143              validation_split=0.1)
144    model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, verbose=0)
145    model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, verbose=1,
146              shuffle=False)
147
148    model.train_on_batch(x_train[:32], y_train[:32])
149
150    loss_np = model.evaluate(x_test, y_test)
151    predict_np = model.predict(x_test)
152
153    generator_pred_np = model.predict_generator(
154        data_generator(x_test, y_test), 1,
155        max_queue_size=2, verbose=1)
156    generator_loss_np = model.evaluate_generator(
157        data_generator(x_test, y_test, 50), 1,
158        max_queue_size=2)
159
160    assert_allclose(loss_np, generator_loss_np, atol=1e-5)
161    assert_allclose(predict_np, generator_pred_np, atol=1e-5)
162
163    model.predict(x_test, verbose=0)
164    model.predict_classes(x_test, verbose=0)
165    model.predict_proba(x_test, verbose=0)
166
167    fname = 'test_sequential_temp.h5'
168    model.save_weights(fname, overwrite=True)
169    model = Sequential()
170    model.add(Dense(num_hidden, input_shape=(input_dim,)))
171    model.add(Activation('relu'))
172    model.add(Dense(num_classes))
173    model.add(Activation('softmax'))
174    model.compile(loss='categorical_crossentropy', optimizer='rmsprop')
175    model.load_weights(fname)
176    os.remove(fname)
177
178    nloss = model.evaluate(x_test, y_test, verbose=0)
179    assert(loss_np == nloss)
180
181    # Test serialization
182    config = model.get_config()
183    assert 'name' in config
184    new_model = Sequential.from_config(config)
185    assert new_model.weights  # Model should be built.
186
187    model.summary()
188    json_str = model.to_json()
189    model_from_json(json_str)
190
191    yaml_str = model.to_yaml()
192    model_from_yaml(yaml_str)
193
194
195def test_nested_sequential(in_tmpdir):
196    (x_train, y_train), (x_test, y_test) = _get_test_data()
197
198    inner = Sequential()
199    inner.add(Dense(num_hidden, input_shape=(input_dim,)))
200    inner.add(Activation('relu'))
201    inner.add(Dense(num_classes))
202
203    middle = Sequential()
204    middle.add(inner)
205
206    model = Sequential()
207    model.add(middle)
208    model.add(Activation('softmax'))
209    model.compile(loss='categorical_crossentropy', optimizer='rmsprop')
210
211    model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, verbose=1,
212              validation_data=(x_test, y_test))
213    model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, verbose=2,
214              validation_split=0.1)
215    model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, verbose=0)
216    model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, verbose=1,
217              shuffle=False)
218
219    model.train_on_batch(x_train[:32], y_train[:32])
220
221    loss = model.evaluate(x_test, y_test, verbose=0)
222
223    model.predict(x_test, verbose=0)
224    model.predict_classes(x_test, verbose=0)
225    model.predict_proba(x_test, verbose=0)
226
227    fname = 'test_nested_sequential_temp.h5'
228    model.save_weights(fname, overwrite=True)
229
230    inner = Sequential()
231    inner.add(Dense(num_hidden, input_shape=(input_dim,)))
232    inner.add(Activation('relu'))
233    inner.add(Dense(num_classes))
234
235    middle = Sequential()
236    middle.add(inner)
237
238    model = Sequential()
239    model.add(middle)
240    model.add(Activation('softmax'))
241    model.compile(loss='categorical_crossentropy', optimizer='rmsprop')
242    model.load_weights(fname)
243    os.remove(fname)
244
245    nloss = model.evaluate(x_test, y_test, verbose=0)
246    assert(loss == nloss)
247
248    # Test serialization
249    config = model.get_config()
250    Sequential.from_config(config)
251
252    model.summary()
253    json_str = model.to_json()
254    model_from_json(json_str)
255
256    yaml_str = model.to_yaml()
257    model_from_yaml(yaml_str)
258
259
260def test_sequential_count_params():
261    input_dim = 20
262    num_units = 10
263    num_classes = 2
264
265    n = input_dim * num_units + num_units
266    n += num_units * num_units + num_units
267    n += num_units * num_classes + num_classes
268
269    model = Sequential()
270    model.add(Dense(num_units, input_shape=(input_dim,)))
271    model.add(Dense(num_units))
272    model.add(Dense(num_classes))
273    model.add(Activation('softmax'))
274    model.build()
275
276    assert(n == model.count_params())
277
278    model.compile('sgd', 'binary_crossentropy')
279    assert(n == model.count_params())
280
281
282def test_nested_sequential_trainability():
283    input_dim = 20
284    num_units = 10
285    num_classes = 2
286
287    inner_model = Sequential()
288    inner_model.add(Dense(num_units, input_shape=(input_dim,)))
289
290    model = Sequential()
291    model.add(inner_model)
292    model.add(Dense(num_classes))
293
294    assert len(model.trainable_weights) == 4
295    inner_model.trainable = False
296    assert len(model.trainable_weights) == 2
297    inner_model.trainable = True
298    assert len(model.trainable_weights) == 4
299
300
301def test_rebuild_model():
302    model = Sequential()
303    model.add(Dense(128, input_shape=(784,)))
304    model.add(Dense(64))
305    assert(model.get_layer(index=-1).output_shape == (None, 64))
306
307    model.add(Dense(32))
308    assert(model.get_layer(index=-1).output_shape == (None, 32))
309
310
311def test_clone_functional_model():
312    val_a = np.random.random((10, 4))
313    val_b = np.random.random((10, 4))
314    val_out = np.random.random((10, 4))
315
316    input_a = keras.Input(shape=(4,))
317    input_b = keras.Input(shape=(4,))
318    dense_1 = keras.layers.Dense(4)
319    dense_2 = keras.layers.Dense(4)
320
321    x_a = dense_1(input_a)
322    x_a = keras.layers.Dropout(0.5)(x_a)
323    x_a = keras.layers.BatchNormalization()(x_a)
324    x_b = dense_1(input_b)
325    x_a = dense_2(x_a)
326    outputs = keras.layers.add([x_a, x_b])
327    model = keras.models.Model([input_a, input_b], outputs)
328
329    if K.backend() == 'tensorflow':
330        # Everything should work in a new session.
331        K.clear_session()
332
333    # With placeholder creation
334    new_model = keras.models.clone_model(model)
335    new_model.compile('rmsprop', 'mse')
336    new_model.train_on_batch([val_a, val_b], val_out)
337
338    # On top of new tensors
339    input_a = keras.Input(shape=(4,), name='a')
340    input_b = keras.Input(shape=(4,), name='b')
341    new_model = keras.models.clone_model(
342        model, input_tensors=[input_a, input_b])
343    new_model.compile('rmsprop', 'mse')
344    new_model.train_on_batch([val_a, val_b], val_out)
345
346    # # On top of new, non-Keras tensors
347    # input_a = keras.backend.variable(val_a)
348    # input_b = keras.backend.variable(val_b)
349    # new_model = keras.models.clone_model(
350    #     model, input_tensors=[input_a, input_b])
351    # new_model.compile('rmsprop', 'mse')
352    # new_model.train_on_batch(None, val_out)
353
354
355def test_clone_functional_model_with_multi_outputs():
356    input_layer = keras.Input(shape=(4,))
357
358    # Layer with single input and multiple outputs
359    layer1 = keras.layers.Lambda(lambda x: [x + 1, x],
360                                 lambda shapes: [shapes, shapes])
361    x_a, x_b = layer1(input_layer)
362
363    class SwapLayer(keras.layers.Layer):
364        def call(self, inputs, **kwargs):
365            return [inputs[1], inputs[0]]
366
367        def compute_output_shape(self, input_shape):
368            return [input_shape[1], input_shape[0]]
369
370    # Layer with multiple inputs and outputs
371    x_a, x_b = SwapLayer()([x_a, x_b])
372    model = keras.Model(inputs=[input_layer], outputs=[x_a, x_b])
373    new_model = keras.models.clone_model(model)
374
375    x_test = np.random.random((10, 4))
376    pred_a, pred_b = model.predict(x_test)
377    pred_new_a, pred_new_b = new_model.predict(x_test)
378    assert(pred_a.all() == pred_new_a.all())
379    assert(pred_b.all() == pred_new_b.all())
380
381
382def test_clone_sequential_model():
383    val_a = np.random.random((10, 4))
384    val_out = np.random.random((10, 4))
385
386    model = keras.models.Sequential()
387    model.add(keras.layers.Dense(4, input_shape=(4,)))
388    model.add(keras.layers.BatchNormalization())
389    model.add(keras.layers.Dropout(0.5))
390    model.add(keras.layers.Dense(4))
391
392    if K.backend() == 'tensorflow':
393        # Everything should work in a new session.
394        K.clear_session()
395
396    # With placeholder creation
397    new_model = keras.models.clone_model(model)
398    new_model.compile('rmsprop', 'mse')
399    new_model.train_on_batch(val_a, val_out)
400
401    # On top of new tensor
402    input_a = keras.Input(shape=(4,))
403    new_model = keras.models.clone_model(
404        model, input_tensors=input_a)
405    new_model.compile('rmsprop', 'mse')
406    new_model.train_on_batch(val_a, val_out)
407
408    # # On top of new, non-Keras tensor
409    # input_a = keras.backend.variable(val_a)
410    # new_model = keras.models.clone_model(
411    #     model, input_tensors=input_a)
412    # new_model.compile('rmsprop', 'mse')
413    # new_model.train_on_batch(None, val_out)
414
415
416def test_sequential_update_disabling():
417    val_a = np.random.random((10, 4))
418    val_out = np.random.random((10, 4))
419
420    model = keras.models.Sequential()
421    model.add(keras.layers.BatchNormalization(input_shape=(4,)))
422
423    model.trainable = False
424    assert not model.updates
425
426    model.compile('sgd', 'mse')
427    assert not model.updates
428
429    x1 = model.predict(val_a)
430    model.train_on_batch(val_a, val_out)
431    x2 = model.predict(val_a)
432    assert_allclose(x1, x2, atol=1e-7)
433
434    model.trainable = True
435    model.compile('sgd', 'mse')
436    assert model.updates
437
438    model.train_on_batch(val_a, val_out)
439    x2 = model.predict(val_a)
440    assert np.abs(np.sum(x1 - x2)) > 1e-5
441
442
443def test_sequential_deferred_build():
444    model = keras.models.Sequential()
445    model.add(keras.layers.Dense(3))
446    model.add(keras.layers.Dense(3))
447    model.compile('sgd', 'mse')
448
449    assert model.built is False
450    assert len(model.layers) == 2
451
452    model.train_on_batch(
453        np.random.random((2, 4)), np.random.random((2, 3)))
454
455    assert model.built is True
456    assert len(model.layers) == 2
457    assert len(model.weights) == 4
458
459    # Test serialization
460    config = model.get_config()
461    assert 'name' in config
462    new_model = Sequential.from_config(config)
463    assert new_model.built is True
464    assert len(new_model.layers) == 2
465    assert len(new_model.weights) == 4
466
467
468def test_nested_sequential_deferred_build():
469    inner_model = keras.models.Sequential()
470    inner_model.add(keras.layers.Dense(3))
471    inner_model.add(keras.layers.Dense(3))
472
473    model = keras.models.Sequential()
474    model.add(inner_model)
475    model.add(keras.layers.Dense(5))
476    model.compile('sgd', 'mse')
477
478    assert inner_model.built is False
479    assert len(inner_model.layers) == 2
480    assert model.built is False
481    assert len(model.layers) == 2
482
483    model.train_on_batch(
484        np.random.random((2, 4)), np.random.random((2, 5)))
485
486    assert inner_model.built is True
487    assert len(inner_model.layers) == 2
488    assert len(inner_model.weights) == 4
489    assert model.built is True
490    assert len(model.layers) == 2
491    assert len(model.weights) == 6
492
493    config = model.get_config()
494    new_model = keras.models.Sequential.from_config(config)
495    assert new_model.built is True
496    assert len(new_model.layers) == 2
497    assert len(new_model.weights) == 6
498
499    new_inner_model = new_model.layers[0]
500    assert new_inner_model.built is True
501    assert len(new_inner_model.layers) == 2
502    assert len(new_inner_model.weights) == 4
503
504
505if __name__ == '__main__':
506    pytest.main([__file__])
507