1import pytest
2import json
3import numpy as np
4
5from keras.layers import Dense, Dropout, Conv2D, InputLayer
6from keras import layers
7from keras.engine import Input, Layer, saving, get_source_inputs
8from keras.models import Model, Sequential
9from keras import backend as K
10from keras.models import model_from_json, model_from_yaml
11from keras.initializers import Constant
12from tensorflow.python.keras.saving.hdf5_format import preprocess_weights_for_loading
13
14
15
16def test_get_updates_for():
17    a = Input(shape=(2,))
18    dense_layer = Dense(1)
19    dense_layer.add_update(0, inputs=a)
20    dense_layer.add_update(1, inputs=None)
21
22
23def test_get_losses_for():
24    a = Input(shape=(2,))
25    dense_layer = Dense(1)
26    dense_layer.add_loss(lambda: 0, inputs=a)
27    dense_layer.add_loss(lambda: 1, inputs=None)
28
29
30def test_trainable_weights():
31    a = Input(shape=(2,))
32    b = Dense(1)(a)
33    model = Model(a, b)
34
35    weights = model.weights
36    assert model.trainable_weights == weights
37    assert model.non_trainable_weights == []
38
39    model.trainable = False
40    assert model.trainable_weights == []
41    assert model.non_trainable_weights == weights
42
43    model.trainable = True
44    assert model.trainable_weights == weights
45    assert model.non_trainable_weights == []
46
47    model.layers[1].trainable = False
48    assert model.trainable_weights == []
49    assert model.non_trainable_weights == weights
50
51    # sequential model
52    model = Sequential()
53    model.add(Dense(1, input_dim=2))
54    weights = model.weights
55
56    assert model.trainable_weights == weights
57    assert model.non_trainable_weights == []
58
59    model.trainable = False
60    assert model.trainable_weights == []
61    assert model.non_trainable_weights == weights
62
63    model.trainable = True
64    assert model.trainable_weights == weights
65    assert model.non_trainable_weights == []
66
67    model.layers[0].trainable = False
68    assert model.trainable_weights == []
69    assert model.non_trainable_weights == weights
70
71
72def test_valid_compute_mask():
73    model = Sequential()
74    model.add(Dense(1, input_dim=2))
75    assert model.layers[0].supports_masking is True
76    assert model.layers[0].compute_mask([model.input], [0., 1.]) == [0., 1.]
77
78
79def test_invalid_compute_mask():
80    model = Sequential()
81    model.add(Conv2D(1, [2, 2], input_shape=[3, 3, 1]))
82    assert model.layers[0].supports_masking is False
83    assert model.layers[0].compute_mask([model.input], [None]) is None
84
85    mask = np.array([[0., 1.], [1., 0.]])
86    with pytest.raises(TypeError):
87        model.layers[0].compute_mask([model.input], [mask])
88    with pytest.raises(TypeError):
89        model.layers[0].compute_mask([model.input], mask)
90
91
92def test_get_layer():
93    model = Sequential()
94    model.add(Dense(1, input_dim=2))
95    with pytest.raises(ValueError):
96        model.get_layer(index=5)
97    with pytest.raises(ValueError):
98        model.get_layer(index=None)
99    with pytest.raises(ValueError):
100        model.get_layer(name='conv')
101
102
103def DISABLED_test_learning_phase():
104    a = Input(shape=(32,), name='input_a')
105    b = Input(shape=(32,), name='input_b')
106
107    a_2 = Dense(16, name='dense_1')(a)
108    dp = Dropout(0.5, name='dropout')
109    b_2 = dp(b)
110
111    # test merge
112    m = layers.concatenate([a_2, b_2])
113
114    # Test recursion
115    model = Model([a, b], [a_2, b_2])
116    print(model.input_spec)
117
118    c = Input(shape=(32,), name='input_c')
119    d = Input(shape=(32,), name='input_d')
120
121    c_2, b_2 = model([c, d])
122
123    # try actually running graph
124    fn = K.function(model.inputs + [K.learning_phase()], model.outputs)
125    input_a_np = np.random.random((10, 32))
126    input_b_np = np.random.random((10, 32))
127    fn_outputs_no_dp = fn([input_a_np, input_b_np, 0])
128    fn_outputs_dp = fn([input_a_np, input_b_np, 1])
129    # output a: nothing changes
130    assert fn_outputs_no_dp[0].sum() == fn_outputs_dp[0].sum()
131    # output b: dropout applied
132    assert fn_outputs_no_dp[1].sum() != fn_outputs_dp[1].sum()
133
134
135def test_layer_call_arguments():
136    # Test the ability to pass and serialize arguments to `call`.
137    inp = layers.Input(shape=(2,))
138    x = layers.Dense(3)(inp)
139    x = layers.Dropout(0.5)(x, training=True)
140    model = Model(inp, x)
141
142    # Test that argument is kept when applying the model
143    inp2 = layers.Input(shape=(2,))
144    out2 = model(inp2)
145
146    # Test that argument is kept after loading a model
147    config = model.get_config()
148    model = Model.from_config(config)
149
150
151def test_node_construction():
152    ####################################################
153    # test basics
154
155    a = Input(shape=(32,), name='input_a')
156    b = Input(shape=(32,), name='input_b')
157
158    assert tuple(a.shape) == (None, 32)
159    a_layer, a_node_index, a_tensor_index = a._keras_history
160    b_layer, b_node_index, b_tensor_index = b._keras_history
161    assert len(a_layer._inbound_nodes) == 1
162    assert a_tensor_index is 0
163    node = a_layer._inbound_nodes[a_node_index]
164    assert node.outbound_layer == a_layer
165
166    assert isinstance(node.inbound_layers, list)
167    assert node.inbound_layers == []
168    assert isinstance(node.input_tensors, list)
169    assert node.input_tensors == [a]
170    assert isinstance(node.input_shapes, list)
171    assert node.input_shapes == [(None, 32)]
172
173    assert isinstance(node.output_tensors, list)
174    assert node.output_tensors == [a]
175    assert isinstance(node.output_shapes, list)
176    assert node.output_shapes == [(None, 32)]
177
178    dense = Dense(16, name='dense_1')
179    a_2 = dense(a)
180    b_2 = dense(b)
181
182    assert len(dense._inbound_nodes) == 2
183    assert len(dense._outbound_nodes) == 0
184    assert dense._inbound_nodes[0].inbound_layers == a_layer
185    assert dense._inbound_nodes[0].outbound_layer == dense
186    assert dense._inbound_nodes[1].inbound_layers == b_layer
187    assert dense._inbound_nodes[1].outbound_layer == dense
188
189    # test layer properties
190    test_layer = Dense(16, name='test_layer')
191    a_test = test_layer(a)
192    assert K.int_shape(test_layer.kernel) == (32, 16)
193    assert test_layer.input is a
194    assert test_layer.output is a_test
195    assert test_layer.input_mask is None
196    assert test_layer.output_mask is None
197    assert test_layer.input_shape == (None, 32)
198    assert test_layer.output_shape == (None, 16)
199
200    # with pytest.raises(AttributeError):
201    #     dense.input
202    # with pytest.raises(AttributeError):
203    #     dense.output
204    # with pytest.raises(AttributeError):
205    #     dense.input_mask
206    # with pytest.raises(AttributeError):
207    #     dense.output_mask
208
209    assert dense.get_input_at(0) is a
210    assert dense.get_input_at(1)is b
211    assert dense.get_output_at(0) is a_2
212    assert dense.get_output_at(1) is b_2
213    assert dense.get_input_shape_at(0) == (None, 32)
214    assert dense.get_input_shape_at(1) == (None, 32)
215    assert dense.get_output_shape_at(0) == (None, 16)
216    assert dense.get_output_shape_at(1) == (None, 16)
217    assert dense.get_input_mask_at(0) is None
218    assert dense.get_input_mask_at(1) is None
219    assert dense.get_output_mask_at(0) is None
220    assert dense.get_output_mask_at(1) is None
221
222
223def test_multi_input_layer():
224    ####################################################
225    # test multi-input layer
226    a = Input(shape=(32,), name='input_a')
227    b = Input(shape=(32,), name='input_b')
228
229    dense = Dense(16, name='dense_1')
230    a_2 = dense(a)
231    b_2 = dense(b)
232
233    merged = layers.concatenate([a_2, b_2], name='merge')
234    assert tuple(merged.shape) == (None, 16 * 2)
235    merge_layer, merge_node_index, merge_tensor_index = merged._keras_history
236
237    assert merge_node_index == 0
238    assert merge_tensor_index == 0
239
240    assert len(merge_layer._inbound_nodes) == 1
241    assert len(merge_layer._outbound_nodes) == 0
242
243    assert len(merge_layer._inbound_nodes[0].input_tensors) == 2
244    assert len(merge_layer._inbound_nodes[0].inbound_layers) == 2
245
246    c = Dense(64, name='dense_2')(merged)
247    d = Dense(5, name='dense_3')(c)
248
249    model = Model(inputs=[a, b], outputs=[c, d], name='model')
250    assert len(model.layers) == 6
251    expected_shapes = [(None, 64), (None, 5)]
252    assert [tuple(s) for s in model.compute_output_shape([(None, 32), (None, 32)])] == expected_shapes
253    assert model.compute_mask([a, b], [None, None]) == [None, None]
254    assert [tuple(s) for s in model.compute_output_shape([(None, 32), (None, 32)])] == expected_shapes
255
256    # we don't check names of first 2 layers (inputs) because
257    # ordering of same-level layers is not fixed
258    expected_names = ['dense_1', 'merge', 'dense_2', 'dense_3']
259    assert [l.name for l in model.layers][2:] == expected_names
260    assert [l.name for l in model._input_layers] == ['input_a', 'input_b']
261    assert [l.name for l in model._output_layers] == ['dense_2', 'dense_3']
262
263    # actually run model
264    fn = K.function(model.inputs, model.outputs)
265    input_a_np = np.random.random((10, 32))
266    input_b_np = np.random.random((10, 32))
267    fn_outputs = fn([input_a_np, input_b_np])
268    assert [x.shape for x in fn_outputs] == [(10, 64), (10, 5)]
269
270    # test get_source_inputs
271    source_inputs = get_source_inputs(c)
272    assert source_inputs[0] is a
273    assert source_inputs[1] is b
274
275    # serialization / deserialization
276    json_config = model.to_json()
277    recreated_model = model_from_json(json_config)
278    recreated_model.compile('rmsprop', 'mse')
279
280    assert [l.name for l in recreated_model.layers][2:] == expected_names
281    assert [l.name for l in recreated_model._input_layers] == ['input_a', 'input_b']
282    assert [l.name for l in recreated_model._output_layers] == ['dense_2', 'dense_3']
283
284    fn = K.function(recreated_model.inputs, recreated_model.outputs)
285    input_a_np = np.random.random((10, 32))
286    input_b_np = np.random.random((10, 32))
287    fn_outputs = fn([input_a_np, input_b_np])
288    assert [x.shape for x in fn_outputs] == [(10, 64), (10, 5)]
289
290
291def test_recursion():
292    ####################################################
293    # test recursion
294
295    a = Input(shape=(32,), name='input_a')
296    b = Input(shape=(32,), name='input_b')
297
298    dense = Dense(16, name='dense_1')
299    a_2 = dense(a)
300    b_2 = dense(b)
301    merged = layers.concatenate([a_2, b_2], name='merge')
302    c = Dense(64, name='dense_2')(merged)
303    d = Dense(5, name='dense_3')(c)
304
305    model = Model(inputs=[a, b], outputs=[c, d], name='model')
306
307    e = Input(shape=(32,), name='input_e')
308    f = Input(shape=(32,), name='input_f')
309    g, h = model([e, f])
310
311    # g2, h2 = model([e, f])
312
313    assert tuple(g.shape) == tuple(c.shape)
314    assert tuple(h.shape) == tuple(d.shape)
315
316    # test separate manipulation of different layer outputs
317    i = Dense(7, name='dense_4')(h)
318
319    final_model = Model(inputs=[e, f], outputs=[i, g], name='final')
320    assert len(final_model.inputs) == 2
321    assert len(final_model.outputs) == 2
322    assert len(final_model.layers) == 4
323
324    # we don't check names of first 2 layers (inputs) because
325    # ordering of same-level layers is not fixed
326    expected_shapes = [(10, 7), (10, 64)]
327    assert [layer.name for layer in final_model.layers][2:] == ['model', 'dense_4']
328    assert model.compute_mask([e, f], [None, None]) == [None, None]
329    assert final_model.compute_output_shape([(10, 32), (10, 32)]) == expected_shapes
330
331    # run recursive model
332    fn = K.function(final_model.inputs, final_model.outputs)
333    input_a_np = np.random.random((10, 32))
334    input_b_np = np.random.random((10, 32))
335    fn_outputs = fn([input_a_np, input_b_np])
336    assert [x.shape for x in fn_outputs] == [(10, 7), (10, 64)]
337
338    # test serialization
339    model_config = final_model.get_config()
340    print(json.dumps(model_config, indent=4))
341    recreated_model = Model.from_config(model_config)
342
343    fn = K.function(recreated_model.inputs, recreated_model.outputs)
344    input_a_np = np.random.random((10, 32))
345    input_b_np = np.random.random((10, 32))
346    fn_outputs = fn([input_a_np, input_b_np])
347    assert [x.shape for x in fn_outputs] == [(10, 7), (10, 64)]
348
349    ####################################################
350    # test multi-input multi-output
351
352    j = Input(shape=(32,), name='input_j')
353    k = Input(shape=(32,), name='input_k')
354    m, n = model([j, k])
355
356    o = Input(shape=(32,), name='input_o')
357    p = Input(shape=(32,), name='input_p')
358    q, r = model([o, p])
359
360    assert tuple(n.shape) == (None, 5)
361    assert tuple(q.shape) == (None, 64)
362    s = layers.concatenate([n, q], name='merge_nq')
363    assert tuple(s.shape) == (None, 64 + 5)
364
365    # test with single output as 1-elem list
366    multi_io_model = Model([j, k, o, p], [s])
367
368    fn = K.function(multi_io_model.inputs, multi_io_model.outputs)
369    fn_outputs = fn([np.random.random((10, 32)), np.random.random((10, 32)),
370                     np.random.random((10, 32)), np.random.random((10, 32))])
371    assert [x.shape for x in fn_outputs] == [(10, 69)]
372
373    # test with single output as tensor
374    multi_io_model = Model([j, k, o, p], s)
375
376    fn = K.function(multi_io_model.inputs, multi_io_model.outputs)
377    fn_outputs = fn([np.random.random((10, 32)), np.random.random((10, 32)),
378                     np.random.random((10, 32)), np.random.random((10, 32))])
379    # note that the output of the K.function will still be a 1-elem list
380    assert [x.shape for x in fn_outputs] == [(10, 69)]
381
382    # test serialization
383    model_config = multi_io_model.get_config()
384    recreated_model = Model.from_config(model_config)
385
386    fn = K.function(recreated_model.inputs, recreated_model.outputs)
387    fn_outputs = fn([np.random.random((10, 32)), np.random.random((10, 32)),
388                     np.random.random((10, 32)), np.random.random((10, 32))])
389    # note that the output of the K.function will still be a 1-elem list
390    assert [x.shape for x in fn_outputs] == [(10, 69)]
391
392    config = model.get_config()
393    Model.from_config(config)
394
395    model.summary()
396    json_str = model.to_json()
397    model_from_json(json_str)
398
399    yaml_str = model.to_yaml()
400    model_from_yaml(yaml_str)
401
402    ####################################################
403    # test invalid graphs
404
405    # input is not an Input tensor
406    j = Input(shape=(32,), name='input_j')
407    j = Dense(32)(j)
408    k = Input(shape=(32,), name='input_k')
409    m, n = model([j, k])
410
411    with pytest.raises(ValueError):
412        Model([j, k], [m, n])
413
414    # disconnected graph
415    j = Input(shape=(32,), name='input_j')
416    k = Input(shape=(32,), name='input_k')
417    m, n = model([j, k])
418    with pytest.raises(ValueError):
419        Model([j], [m, n])
420
421    # redundant outputs
422    j = Input(shape=(32,), name='input_j')
423    k = Input(shape=(32,), name='input_k')
424    m, n = model([j, k])
425    # this should work with a warning
426    Model([j, k], [m, n, n])
427
428    # redundant inputs
429    j = Input(shape=(32,), name='input_j')
430    k = Input(shape=(32,), name='input_k')
431    m, n = model([j, k])
432    with pytest.raises(ValueError):
433        Model([j, k, j], [m, n])
434
435    ####################################################
436    # test calling layers/models on placeholders
437    j = Input(shape=(32,), name='input_j')
438    k = Input(shape=(32,), name='input_k')
439    m, n = model([j, k])
440    outer_model = Model([j, k], [m, n])
441
442    j_tf = K.placeholder(shape=(None, 32), dtype=K.floatx())
443    k_tf = K.placeholder(shape=(None, 32), dtype=K.floatx())
444    m_tf, n_tf = outer_model([j_tf, k_tf])
445    assert K.int_shape(m_tf) == (None, 64)
446    assert K.int_shape(n_tf) == (None, 5)
447
448    # test merge
449    layers.concatenate([j_tf, k_tf], axis=1)
450    layers.add([j_tf, k_tf])
451
452    # test tensor input
453    x = K.placeholder(shape=(None, 2), dtype=K.floatx())
454    InputLayer(input_tensor=x)
455
456    x = Input(tensor=x)
457    Dense(2)(x)
458
459
460def test_load_layers():
461    from keras.layers import ConvLSTM2D, TimeDistributed
462    from keras.layers import Bidirectional, Conv2D, Input
463    from keras.models import Model
464
465    if K.backend() == 'tensorflow' or K.backend() == 'cntk':
466        inputs = Input(shape=(10, 20, 20, 1))
467    else:
468        inputs = Input(shape=(10, 1, 20, 20))
469    td_conv = TimeDistributed(Conv2D(15, (5, 5)))(inputs)
470    bi_conv = Bidirectional(ConvLSTM2D(10, (3, 3)), merge_mode='concat')(td_conv)
471    model = Model(inputs=inputs, outputs=bi_conv)
472
473    weight_value_tuples = []
474
475    # TimeDistributed Conv2D layer
476    # use 'channels_first' data format to check that
477    # the function is being called correctly for Conv2D
478    # old: (filters, stack_size, kernel_rows, kernel_cols)
479    # new: (kernel_rows, kernel_cols, stack_size, filters)
480    weight_tensor_td_conv_old = list()
481    weight_tensor_td_conv_old.append(np.zeros((15, 1, 5, 5)))
482    weight_tensor_td_conv_old.append(np.zeros((15,)))
483    td_conv_layer = model.layers[1]
484    td_conv_layer.layer.data_format = 'channels_first'
485    weight_tensor_td_conv_new = preprocess_weights_for_loading(
486        td_conv_layer,
487        weight_tensor_td_conv_old,
488        original_keras_version='1')
489    symbolic_weights = td_conv_layer.weights
490    assert (len(symbolic_weights) == len(weight_tensor_td_conv_new))
491    weight_value_tuples += zip(symbolic_weights, weight_tensor_td_conv_new)
492
493    # Bidirectional ConvLSTM2D layer
494    # old ConvLSTM2D took a list of 12 weight tensors,
495    # returns a list of 3 concatenated larger tensors.
496    weights_bi_conv_old = []
497    for j in range(2):  # bidirectional
498        for i in range(4):
499            weights_bi_conv_old.append(np.zeros((3, 3, 15, 10)))  # kernel
500            weights_bi_conv_old.append(np.zeros((3, 3, 10, 10)))  # recurrent kernel
501            weights_bi_conv_old.append(np.zeros((10,)))  # bias
502
503    bi_convlstm_layer = model.layers[2]
504    weights_bi_conv_new = preprocess_weights_for_loading(
505        bi_convlstm_layer,
506        weights_bi_conv_old,
507        original_keras_version='1')
508
509    symbolic_weights = bi_convlstm_layer.weights
510    assert (len(symbolic_weights) == len(weights_bi_conv_new))
511    weight_value_tuples += zip(symbolic_weights, weights_bi_conv_new)
512
513    K.batch_set_value(weight_value_tuples)
514
515    assert np.all(K.eval(model.layers[1].weights[0]) == weight_tensor_td_conv_new[0])
516    assert np.all(K.eval(model.layers[1].weights[1]) == weight_tensor_td_conv_new[1])
517    assert np.all(K.eval(model.layers[2].weights[0]) == weights_bi_conv_new[0])
518    assert np.all(K.eval(model.layers[2].weights[1]) == weights_bi_conv_new[1])
519    assert np.all(K.eval(model.layers[2].weights[2]) == weights_bi_conv_new[2])
520    assert np.all(K.eval(model.layers[2].weights[3]) == weights_bi_conv_new[3])
521    assert np.all(K.eval(model.layers[2].weights[4]) == weights_bi_conv_new[4])
522    assert np.all(K.eval(model.layers[2].weights[5]) == weights_bi_conv_new[5])
523
524
525def convert_weights(layer, weights):
526    if layer.__class__.__name__ == 'GRU':
527        W = [np.split(w, 3, axis=-1) for w in weights]
528        return sum(map(list, zip(*W)), [])
529    elif layer.__class__.__name__ in ('LSTM', 'ConvLSTM2D'):
530        W = [np.split(w, 4, axis=-1) for w in weights]
531        for w in W:
532            w[2], w[1] = w[1], w[2]
533        return sum(map(list, zip(*W)), [])
534    elif layer.__class__.__name__ == 'Conv2DTranspose':
535        return [np.transpose(weights[0], (2, 3, 0, 1)), weights[1]]
536    return weights
537
538
539@pytest.mark.parametrize("layer", [
540    layers.GRU(2, input_shape=[3, 5]),
541    layers.LSTM(2, input_shape=[3, 5]),
542    layers.ConvLSTM2D(5, (3, 3),
543                      input_shape=[6, 6, 6, 6],
544                      data_format='channels_first'),
545], ids=['GRU', 'LSTM', 'ConvLSTM2D'])
546def test_preprocess_weights_for_loading(layer):
547    # A model is needed to initialize weights.
548    _ = Sequential([layer])
549    weights1 = layer.get_weights()
550    weights2 = preprocess_weights_for_loading(
551        layer, convert_weights(layer, weights1),
552        original_keras_version='1')
553    assert all([np.allclose(x, y, 1e-5)
554                for (x, y) in zip(weights1, weights2)])
555
556
557@pytest.mark.parametrize("layer", [
558    layers.Conv2D(2, (3, 3), input_shape=[5, 5, 3]),
559    layers.Conv2DTranspose(2, (5, 5),
560                           input_shape=[7, 7, 3],
561                           data_format='channels_first'),
562], ids=['Conv2D', 'Conv2DTranspose'])
563def test_preprocess_weights_for_loading_for_model(layer):
564    model = Sequential([layer])
565    weights1 = model.get_weights()
566    weights2 = preprocess_weights_for_loading(
567        model, convert_weights(layer, weights1),
568        original_keras_version='1')
569    assert all([np.allclose(x, y, 1e-5)
570                for (x, y) in zip(weights1, weights2)])
571
572
573@pytest.mark.parametrize('layer_class,args', [
574    (layers.GRU, {'units': 2, 'input_shape': [3, 5]}),
575    (layers.GRU, {'units': 2, 'input_shape': [3, 5], 'reset_after': True}),
576    (layers.LSTM, {'units': 2, 'input_shape': [3, 5]}),
577])
578def test_preprocess_weights_for_loading_rnn_should_be_idempotent(layer_class, args):
579    """
580    Loading weights from a RNN class to itself should not convert the weights.
581    """
582    # layer can be instantiated only for supported backends
583    layer = layer_class(**args)
584    # A model is needed to initialize weights.
585    _ = Sequential([layer])
586    weights1 = layer.get_weights()
587    weights2 = preprocess_weights_for_loading(layer, weights1)
588    assert all([np.allclose(x, y, 1e-5) for (x, y) in zip(weights1, weights2)])
589
590
591def test_recursion_with_bn_and_loss():
592    model1 = Sequential([
593        layers.Dense(5, input_dim=5, activity_regularizer='l1'),
594        layers.BatchNormalization(),
595        layers.Dense(5),
596    ])
597
598    inputs = layers.Input(shape=(5,))
599    outputs = model1(inputs)
600    model2 = Model(inputs=inputs, outputs=outputs)
601
602    model1.compile(optimizer='sgd', loss='categorical_crossentropy')
603    model2.compile(optimizer='sgd', loss='categorical_crossentropy')
604
605    x = np.ones((3, 5))
606    y = np.ones((3, 5))
607    model1.fit(x, y, verbose=0, epochs=1)
608    model2.fit(x, y, verbose=0, epochs=1)
609
610
611def test_activity_regularization_with_model_composition():
612
613    def reg(x):
614        return K.sum(x)
615
616    net_a_input = Input((2,))
617    net_a = net_a_input
618    net_a = Dense(2, kernel_initializer='ones',
619                  use_bias=False,
620                  activity_regularizer=reg)(net_a)
621    model_a = Model([net_a_input], [net_a])
622
623    net_b_input = Input((2,))
624    net_b = model_a(net_b_input)
625    model_b = Model([net_b_input], [net_b])
626
627    model_b.compile(optimizer='sgd', loss=None)
628    x = np.ones((1, 2))
629    loss = model_b.evaluate(x)
630    assert loss == 4
631
632
633def test_shared_layer_depth_is_correct():
634    # Basic outline here: we have a shared embedding layer, and two inputs that
635    # go through different depths of computation in the graph before
636    # the final output.  We need the computed depth of the input layers to be
637    # the same, because they both pass through the embedding layer before anything
638    # else happens.  That's what we're testing.
639    from keras.layers import Embedding, Input, Dense, Concatenate
640    from keras.models import Model
641    input1 = Input(shape=(10,), name='input1')
642    input2 = Input(shape=(10,), name='input2')
643    embedding_layer = Embedding(name='embedding', input_dim=5, output_dim=10)
644    embedded_input1 = embedding_layer(input1)
645    embedded_input2 = embedding_layer(input2)
646    transformed_input2 = Dense(6)(Dense(5)(Dense(3)(embedded_input2)))
647    final_output = Dense(2)(Concatenate()([embedded_input1, transformed_input2]))
648    model = Model(inputs=[input1, input2], outputs=final_output)
649
650
651def test_layer_sharing_at_heterogeneous_depth():
652    x_val = np.random.random((10, 5))
653
654    x = Input(shape=(5,))
655    A = Dense(5, name='A')
656    B = Dense(5, name='B')
657    output = A(B(A(B(x))))
658    M = Model(x, output)
659
660    output_val = M.predict(x_val)
661
662    config = M.get_config()
663    weights = M.get_weights()
664
665    M2 = Model.from_config(config)
666    M2.set_weights(weights)
667
668    output_val_2 = M2.predict(x_val)
669    np.testing.assert_allclose(output_val, output_val_2, atol=1e-6)
670
671
672def test_layer_sharing_at_heterogeneous_depth_with_concat():
673    input_shape = (16, 9, 3)
674    input_layer = Input(shape=input_shape)
675
676    A = Dense(3, name='dense_A')
677    B = Dense(3, name='dense_B')
678    C = Dense(3, name='dense_C')
679
680    x1 = B(A(input_layer))
681    x2 = A(C(input_layer))
682    output = layers.concatenate([x1, x2])
683
684    M = Model(inputs=input_layer, outputs=output)
685
686    x_val = np.random.random((10, 16, 9, 3))
687    output_val = M.predict(x_val)
688
689    config = M.get_config()
690    weights = M.get_weights()
691
692    M2 = Model.from_config(config)
693    M2.set_weights(weights)
694
695    output_val_2 = M2.predict(x_val)
696    np.testing.assert_allclose(output_val, output_val_2, atol=1e-6)
697
698
699def DISABLED_test_layer_sharing_at_heterogeneous_depth_order():
700    # This tests for the bug in this issue
701    # https://github.com/keras-team/keras/issues/11159
702    # It occurs with layer sharing at heterogeneous depth when
703    # the layers need to be applied in an order that differs from
704    # the order that occurs in the config.
705
706    input_shape = (1, 12)
707    input_layer = Input(shape=input_shape)
708
709    A = Dense(12, name='layer_a')
710    r1 = layers.Reshape((12,))(input_layer)
711    Aout1 = A(r1)
712
713    r2 = layers.Reshape((12,))(A(input_layer))
714    Aout2 = A(r2)
715
716    # Note: if the order of the layers in the concat is
717    # changed to ([Aout1, Aout2]) the bug doesn't trigger
718    c1 = layers.concatenate([Aout2, Aout1])
719    output = Dense(2, name='layer_b')(c1)
720
721    M = Model(inputs=input_layer, outputs=output)
722
723    x_val = np.random.random((10,) + input_shape)
724    output_val = M.predict(x_val)
725
726    config = M.get_config()
727    weights = M.get_weights()
728
729    M2 = Model.from_config(config)
730    M2.set_weights(weights)
731
732    output_val_2 = M2.predict(x_val)
733    np.testing.assert_allclose(output_val, output_val_2, atol=1e-6)
734
735
736def test_multi_output_mask():
737    """Fixes #7589"""
738    class TestMultiOutputLayer(Layer):
739        def __init__(self, **kwargs):
740            super(TestMultiOutputLayer, self).__init__(**kwargs)
741
742        def call(self, inputs, **kwargs):
743            return [K.abs(inputs), K.abs(inputs)]
744
745        def compute_output_shape(self, input_shape):
746            out_shape = super(TestMultiOutputLayer, self).compute_output_shape(
747                input_shape)
748            return [out_shape, out_shape]
749
750    class TestMultiInputLayer(Layer):
751        def __init__(self, **kwargs):
752            super(TestMultiInputLayer, self).__init__(**kwargs)
753
754        def call(self, inputs, **kwargs):
755            negative, positive = inputs
756            return negative + positive
757
758    input_layer = Input(shape=(16, 16, 3))
759    x, y = TestMultiOutputLayer()(input_layer)
760    z = TestMultiInputLayer()([x, y])
761    _ = Model(inputs=input_layer, outputs=z)
762    assert K.int_shape(z)[1:] == (16, 16, 3)
763
764
765def test_constant_initializer_with_numpy():
766    model = Sequential()
767    model.add(Dense(2, input_shape=(3,),
768                    kernel_initializer=Constant(1.)))
769    model.add(Dense(3))
770    model.compile(loss='mse', optimizer='sgd', metrics=['acc'])
771
772    json_str = model.to_json()
773    model_from_json(json_str).summary()
774
775    yaml_str = model.to_yaml()
776    model_from_yaml(yaml_str).summary()
777
778
779@pytest.mark.skipif(K.backend() == 'cntk',
780                    reason='Float64 not supported with CNTK.')
781def test_initialization_dtype():
782    class TestLayer(Layer):
783        def __init__(self):
784            super(TestLayer, self).__init__(dtype='int64')
785            self.w = self.add_weight('w', [], initializer=Constant(1))
786
787    layer = TestLayer()
788    assert K.dtype(layer.w) == 'int64'
789
790    class TestModel(Model):
791        def __init__(self):
792            super(TestModel, self).__init__(dtype='int64')
793            self.w = self.add_weight('w', [], initializer=Constant(1))
794
795    model = TestModel()
796    assert K.dtype(model.w) == 'int64'
797
798
799if __name__ == '__main__':
800    pytest.main([__file__])
801