1import sys 2import unittest 3 4import numpy 5 6from ideep4py import array 7from ideep4py import pooling2DParam 8from ideep4py import convolution2DParam 9from ideep4py import pooling2D 10from ideep4py import convolution2D 11from ideep4py import linear 12 13try: 14 import testing 15 from testing import condition 16 from testing.conv import col2im_cpu 17 from testing.conv import im2col_cpu 18except Exception as ex: 19 print('*** testing directory is missing: %s' % ex) 20 sys.exit(-1) 21 22 23@testing.parameterize(*testing.product({ 24 'dtype': [numpy.float32], 25 'bs': [256], 26 'ic': [256], 27 'oc': [256], 28})) 29class TestPooling2DPyF32(unittest.TestCase): 30 31 def setUp(self): 32 self.x = numpy.random.uniform( 33 -1, 1, (self.bs, self.ic, 13, 13)).astype(self.dtype) 34 self.W = numpy.random.uniform( 35 -1, 1, (self.oc, self.ic, 3, 3)).astype(self.dtype) 36 37 self.linear_gy = numpy.random.uniform( 38 -1, 1, (self.bs, 4096)).astype(self.dtype) 39 self.linear_W = \ 40 numpy.random.uniform(-1, 1, (4096, 9216)).astype(self.dtype) 41 42 self.cp = convolution2DParam( 43 (self.bs, self.oc, 13, 13), 1, 1, 1, 1, 1, 1, 1, 1) 44 45 self.pp_fwd = pooling2DParam( 46 (self.bs, self.oc, 6, 6), 3, 3, 2, 2, 0, 0, 0, 0, 47 pooling2DParam.pooling_max) 48 self.pp_bwd = pooling2DParam( 49 self.x.shape, 3, 3, 2, 2, 0, 0, 0, 0, 50 pooling2DParam.pooling_max) 51 52 self.check_forward_options = {'atol': 1e-5, 'rtol': 1e-4} 53 self.check_backward_options = {'atol': 1e-5, 'rtol': 1e-4} 54 55 def check_forward(self, x, W, cp, pp): 56 x_in = convolution2D.Forward(array(x), array(W), None, cp) 57 y_act, self.indexes_act = pooling2D.Forward(x_in, pp) 58 59 x_in_npy = numpy.array(x_in, dtype=self.dtype) 60 y_act_npy = numpy.array(y_act, dtype=self.dtype) 61 indexes_act_npy = numpy.array(self.indexes_act, dtype=self.dtype) 62 63 col = im2col_cpu( 64 x_in_npy, 3, 3, 2, 2, 0, 0, 65 pval=-float('inf'), cover_all=True) 66 n, c, kh, kw, out_h, out_w = col.shape 67 col = col.reshape(n, c, kh * kw, out_h, out_w) 68 self.indexes_ref = col.argmax(axis=2) 69 y_ref = col.max(axis=2) 70 71 numpy.testing.assert_allclose( 72 y_act_npy, y_ref, **self.check_forward_options) 73 numpy.testing.assert_allclose( 74 indexes_act_npy, self.indexes_ref, **self.check_forward_options) 75 76 def check_backward(self, gy, W, pp): 77 gy_in = linear.BackwardData(array(W), array(gy)) 78 gy_in = gy_in.reshape(self.bs, self.oc, 6, 6) 79 gx_act = pooling2D.Backward(gy_in, self.indexes_act, pp) 80 81 gy_in_npy = numpy.array(gy_in, dtype=self.dtype) 82 gx_act_npy = numpy.array(gx_act, dtype=self.dtype) 83 84 n, c, out_h, out_w = gy_in_npy.shape 85 h = 13 86 w = 13 87 kh = 3 88 kw = 3 89 gcol = numpy.zeros( 90 (n * c * out_h * out_w * 3 * 3), dtype=self.dtype) 91 indexes = self.indexes_ref.flatten() 92 indexes += numpy.arange(0, indexes.size * kh * kw, kh * kw) 93 gcol[indexes] = gy_in.ravel() 94 gcol = gcol.reshape(n, c, out_h, out_w, kh, kw) 95 gcol = numpy.swapaxes(gcol, 2, 4) 96 gcol = numpy.swapaxes(gcol, 3, 5) 97 gx_ref = col2im_cpu(gcol, 2, 2, 0, 0, h, w) 98 99 numpy.testing.assert_allclose( 100 gx_act_npy, gx_ref, **self.check_backward_options) 101 102 @condition.retry(3) 103 def test_alexnet_max_pooling_3_cpu(self): 104 self.check_forward(self.x, self.W, self.cp, self.pp_fwd) 105 self.check_backward(self.linear_gy, self.linear_W, self.pp_bwd) 106 107 108testing.run_module(__name__, __file__) 109