1#!/usr/bin/env python 2# 3# Copyright 2012,2013 Free Software Foundation, Inc. 4# 5# This file is part of GNU Radio 6# 7# GNU Radio is free software; you can redistribute it and/or modify 8# it under the terms of the GNU General Public License as published by 9# the Free Software Foundation; either version 3, or (at your option) 10# any later version. 11# 12# GNU Radio is distributed in the hope that it will be useful, 13# but WITHOUT ANY WARRANTY; without even the implied warranty of 14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15# GNU General Public License for more details. 16# 17# You should have received a copy of the GNU General Public License 18# along with GNU Radio; see the file COPYING. If not, write to 19# the Free Software Foundation, Inc., 51 Franklin Street, 20# Boston, MA 02110-1301, USA. 21# 22 23 24from gnuradio import gr, gr_unittest, blocks 25import math 26 27class test_vector_map(gr_unittest.TestCase): 28 29 def setUp(self): 30 self.tb = gr.top_block() 31 32 def tearDown(self): 33 self.tb = None 34 35 def test_reversing(self): 36 # Chunk data in blocks of N and reverse the block contents. 37 N = 5 38 src_data = list(range(0, 20)) 39 expected_result = [] 40 for i in range(N-1, len(src_data), N): 41 for j in range(0, N): 42 expected_result.append(1.0*(i-j)) 43 mapping = [list(reversed([(0, i) for i in range(0, N)]))] 44 src = blocks.vector_source_f(src_data, False, N) 45 vmap = blocks.vector_map(gr.sizeof_float, (N, ), mapping) 46 dst = blocks.vector_sink_f(N) 47 self.tb.connect(src, vmap, dst) 48 self.tb.run() 49 result_data = list(dst.data()) 50 self.assertEqual(expected_result, result_data) 51 52 def test_vector_to_streams(self): 53 # Split an input vector into N streams. 54 N = 5 55 M = 20 56 src_data = list(range(0, M)) 57 expected_results = [] 58 for n in range(0, N): 59 expected_results.append(list(range(n, M, N))) 60 mapping = [[(0, n)] for n in range(0, N)] 61 src = blocks.vector_source_f(src_data, False, N) 62 vmap = blocks.vector_map(gr.sizeof_float, (N, ), mapping) 63 dsts = [blocks.vector_sink_f(1) for n in range(0, N)] 64 self.tb.connect(src, vmap) 65 for n in range(0, N): 66 self.tb.connect((vmap, n), dsts[n]) 67 self.tb.run() 68 for n in range(0, N): 69 result_data = list(dsts[n].data()) 70 self.assertEqual(expected_results[n], result_data) 71 72 def test_interleaving(self): 73 # Takes 3 streams (a, b and c) 74 # Outputs 2 streams. 75 # First (d) is interleaving of a and b. 76 # Second (e) is interleaving of a and b and c. c is taken in 77 # chunks of 2 which are reversed. 78 A = (1, 2, 3, 4, 5) 79 B = (11, 12, 13, 14, 15) 80 C = (99, 98, 97, 96, 95, 94, 93, 92, 91, 90) 81 expected_D = (1, 11, 2, 12, 3, 13, 4, 14, 5, 15) 82 expected_E = (1, 11, 98, 99, 2, 12, 96, 97, 3, 13, 94, 95, 83 4, 14, 92, 93, 5, 15, 90, 91) 84 mapping = [[(0, 0), (1, 0)], # mapping to produce D 85 [(0, 0), (1, 0), (2, 1), (2, 0)], # mapping to produce E 86 ] 87 srcA = blocks.vector_source_f(A, False, 1) 88 srcB = blocks.vector_source_f(B, False, 1) 89 srcC = blocks.vector_source_f(C, False, 2) 90 vmap = blocks.vector_map(gr.sizeof_int, (1, 1, 2), mapping) 91 dstD = blocks.vector_sink_f(2) 92 dstE = blocks.vector_sink_f(4) 93 self.tb.connect(srcA, (vmap, 0)) 94 self.tb.connect(srcB, (vmap, 1)) 95 self.tb.connect(srcC, (vmap, 2)) 96 self.tb.connect((vmap, 0), dstD) 97 self.tb.connect((vmap, 1), dstE) 98 self.tb.run() 99 self.assertEqual(expected_D, dstD.data()) 100 self.assertEqual(expected_E, dstE.data()) 101 102if __name__ == '__main__': 103 gr_unittest.run(test_vector_map, "test_vector_map.xml") 104 105