1#!/usr/bin/env python 2# 3# Copyright 2008,2010,2013 Free Software Foundation, Inc. 4# 5# This file is part of GNU Radio 6# 7# GNU Radio is free software; you can redistribute it and/or modify 8# it under the terms of the GNU General Public License as published by 9# the Free Software Foundation; either version 3, or (at your option) 10# any later version. 11# 12# GNU Radio is distributed in the hope that it will be useful, 13# but WITHOUT ANY WARRANTY; without even the implied warranty of 14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15# GNU General Public License for more details. 16# 17# You should have received a copy of the GNU General Public License 18# along with GNU Radio; see the file COPYING. If not, write to 19# the Free Software Foundation, Inc., 51 Franklin Street, 20# Boston, MA 02110-1301, USA. 21# 22 23 24from gnuradio import gr, gr_unittest, blocks 25import pmt 26import math 27 28def make_tag(key, value, offset, srcid=None): 29 tag = gr.tag_t() 30 tag.key = pmt.string_to_symbol(key) 31 tag.value = pmt.to_pmt(value) 32 tag.offset = offset 33 if srcid is not None: 34 tag.srcid = pmt.to_pmt(srcid) 35 return tag 36 37def compare_tags(a, b): 38 return a.offset == b.offset and pmt.equal(a.key, b.key) and \ 39 pmt.equal(a.value, b.value) and pmt.equal(a.srcid, b.srcid) 40 41class test_vector_sink_source(gr_unittest.TestCase): 42 43 def setUp(self): 44 self.tb = gr.top_block() 45 46 def tearDown(self): 47 self.tb = None 48 49 def test_001(self): 50 # Test that sink has data set in source for the simplest case 51 src_data = [float(x) for x in range(16)] 52 expected_result = tuple(src_data) 53 54 src = blocks.vector_source_f(src_data) 55 dst = blocks.vector_sink_f() 56 57 self.tb.connect(src, dst) 58 self.tb.run() 59 result_data = dst.data() 60 self.assertEqual(expected_result, result_data) 61 62 def test_002(self): 63 # Test vectors (the gnuradio vector I/O type) 64 src_data = [float(x) for x in range(16)] 65 expected_result = tuple(src_data) 66 67 src = blocks.vector_source_f(src_data, False, 2) 68 dst = blocks.vector_sink_f(2) 69 70 self.tb.connect(src, dst) 71 self.tb.run() 72 result_data = dst.data() 73 self.assertEqual(expected_result, result_data) 74 75 def test_003(self): 76 # Test that we can only make vectors (the I/O type) if the input 77 # vector has sufficient size 78 src_data = [float(x) for x in range(16)] 79 expected_result = tuple(src_data) 80 self.assertRaises(RuntimeError, lambda : blocks.vector_source_f(src_data, False, 3)) 81 82 def test_004(self): 83 # Test sending and receiving tagged streams 84 src_data = [float(x) for x in range(16)] 85 expected_result = tuple(src_data) 86 src_tags = tuple([make_tag('key', 'val', 0, 'src')]) 87 expected_tags = src_tags[:] 88 89 src = blocks.vector_source_f(src_data, repeat=False, tags=src_tags) 90 dst = blocks.vector_sink_f() 91 92 self.tb.connect(src, dst) 93 self.tb.run() 94 result_data = dst.data() 95 result_tags = dst.tags() 96 self.assertEqual(expected_result, result_data) 97 self.assertEqual(len(result_tags), 1) 98 self.assertTrue(compare_tags(expected_tags[0], result_tags[0])) 99 100 def test_005(self): 101 # Test that repeat works (with tagged streams) 102 length = 16 103 src_data = [float(x) for x in range(length)] 104 expected_result = tuple(src_data + src_data) 105 src_tags = tuple([make_tag('key', 'val', 0, 'src')]) 106 expected_tags = tuple([make_tag('key', 'val', 0, 'src'), 107 make_tag('key', 'val', length, 'src')]) 108 109 src = blocks.vector_source_f(src_data, repeat=True, tags=src_tags) 110 head = blocks.head(gr.sizeof_float, 2*length) 111 dst = blocks.vector_sink_f() 112 113 self.tb.connect(src, head, dst) 114 self.tb.run() 115 result_data = dst.data() 116 result_tags = dst.tags() 117 self.assertEqual(expected_result, result_data) 118 self.assertEqual(len(result_tags), 2) 119 self.assertTrue(compare_tags(expected_tags[0], result_tags[0])) 120 self.assertTrue(compare_tags(expected_tags[1], result_tags[1])) 121 122 def test_006(self): 123 # Test set_data 124 src_data = [float(x) for x in range(16)] 125 expected_result = tuple(src_data) 126 127 src = blocks.vector_source_f((3,1,4)) 128 dst = blocks.vector_sink_f() 129 src.set_data(src_data) 130 131 self.tb.connect(src, dst) 132 self.tb.run() 133 result_data = dst.data() 134 self.assertEqual(expected_result, result_data) 135 136 def test_007(self): 137 # Test set_repeat 138 src_data = [float(x) for x in range(16)] 139 expected_result = tuple(src_data) 140 141 src = blocks.vector_source_f(src_data, True) 142 dst = blocks.vector_sink_f() 143 src.set_repeat(False) 144 145 self.tb.connect(src, dst) 146 # will timeout if set_repeat does not work 147 self.tb.run() 148 result_data = dst.data() 149 self.assertEqual(expected_result, result_data) 150 151 152if __name__ == '__main__': 153 gr_unittest.run(test_vector_sink_source, "test_vector_sink_source.xml") 154 155