1import sys
2import unittest
3
4import numpy
5
6from ideep4py import array
7from ideep4py import pooling2DParam
8from ideep4py import convolution2DParam
9from ideep4py import pooling2D
10from ideep4py import convolution2D
11from ideep4py import linear
12
13try:
14    import testing
15    from testing import condition
16    from testing.conv import col2im_cpu
17    from testing.conv import im2col_cpu
18except Exception as ex:
19    print('*** testing directory is missing: %s' % ex)
20    sys.exit(-1)
21
22
23@testing.parameterize(*testing.product({
24    'dtype': [numpy.float32],
25    'bs': [256],
26    'ic': [256],
27    'oc': [256],
28}))
29class TestPooling2DPyF32(unittest.TestCase):
30
31    def setUp(self):
32        self.x = numpy.random.uniform(
33            -1, 1, (self.bs, self.ic, 13, 13)).astype(self.dtype)
34        self.W = numpy.random.uniform(
35            -1, 1, (self.oc, self.ic, 3, 3)).astype(self.dtype)
36
37        self.linear_gy = numpy.random.uniform(
38            -1, 1, (self.bs, 4096)).astype(self.dtype)
39        self.linear_W = \
40            numpy.random.uniform(-1, 1, (4096, 9216)).astype(self.dtype)
41
42        self.cp = convolution2DParam(
43            (self.bs, self.oc, 13, 13), 1, 1, 1, 1, 1, 1, 1, 1)
44
45        self.pp_fwd = pooling2DParam(
46            (self.bs, self.oc, 6, 6), 3, 3, 2, 2, 0, 0, 0, 0,
47            pooling2DParam.pooling_max)
48        self.pp_bwd = pooling2DParam(
49            self.x.shape, 3, 3, 2, 2, 0, 0, 0, 0,
50            pooling2DParam.pooling_max)
51
52        self.check_forward_options = {'atol': 1e-5, 'rtol': 1e-4}
53        self.check_backward_options = {'atol': 1e-5, 'rtol': 1e-4}
54
55    def check_forward(self, x, W, cp, pp):
56        x_in = convolution2D.Forward(array(x), array(W), None, cp)
57        y_act, self.indexes_act = pooling2D.Forward(x_in, pp)
58
59        x_in_npy = numpy.array(x_in, dtype=self.dtype)
60        y_act_npy = numpy.array(y_act, dtype=self.dtype)
61        indexes_act_npy = numpy.array(self.indexes_act, dtype=self.dtype)
62
63        col = im2col_cpu(
64            x_in_npy, 3, 3, 2, 2, 0, 0,
65            pval=-float('inf'), cover_all=True)
66        n, c, kh, kw, out_h, out_w = col.shape
67        col = col.reshape(n, c, kh * kw, out_h, out_w)
68        self.indexes_ref = col.argmax(axis=2)
69        y_ref = col.max(axis=2)
70
71        numpy.testing.assert_allclose(
72            y_act_npy, y_ref, **self.check_forward_options)
73        numpy.testing.assert_allclose(
74            indexes_act_npy, self.indexes_ref, **self.check_forward_options)
75
76    def check_backward(self, gy, W, pp):
77        gy_in = linear.BackwardData(array(W), array(gy))
78        gy_in = gy_in.reshape(self.bs, self.oc, 6, 6)
79        gx_act = pooling2D.Backward(gy_in, self.indexes_act, pp)
80
81        gy_in_npy = numpy.array(gy_in, dtype=self.dtype)
82        gx_act_npy = numpy.array(gx_act, dtype=self.dtype)
83
84        n, c, out_h, out_w = gy_in_npy.shape
85        h = 13
86        w = 13
87        kh = 3
88        kw = 3
89        gcol = numpy.zeros(
90            (n * c * out_h * out_w * 3 * 3), dtype=self.dtype)
91        indexes = self.indexes_ref.flatten()
92        indexes += numpy.arange(0, indexes.size * kh * kw, kh * kw)
93        gcol[indexes] = gy_in.ravel()
94        gcol = gcol.reshape(n, c, out_h, out_w, kh, kw)
95        gcol = numpy.swapaxes(gcol, 2, 4)
96        gcol = numpy.swapaxes(gcol, 3, 5)
97        gx_ref = col2im_cpu(gcol, 2, 2, 0, 0, h, w)
98
99        numpy.testing.assert_allclose(
100            gx_act_npy, gx_ref, **self.check_backward_options)
101
102    @condition.retry(3)
103    def test_alexnet_max_pooling_3_cpu(self):
104        self.check_forward(self.x, self.W, self.cp, self.pp_fwd)
105        self.check_backward(self.linear_gy, self.linear_W, self.pp_bwd)
106
107
108testing.run_module(__name__, __file__)
109