1#!/usr/bin/env python
2#
3# Copyright 2008,2010,2013 Free Software Foundation, Inc.
4#
5# This file is part of GNU Radio
6#
7# GNU Radio is free software; you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 3, or (at your option)
10# any later version.
11#
12# GNU Radio is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with GNU Radio; see the file COPYING.  If not, write to
19# the Free Software Foundation, Inc., 51 Franklin Street,
20# Boston, MA 02110-1301, USA.
21#
22
23
24from gnuradio import gr, gr_unittest, blocks
25import pmt
26import math
27
28def make_tag(key, value, offset, srcid=None):
29    tag = gr.tag_t()
30    tag.key = pmt.string_to_symbol(key)
31    tag.value = pmt.to_pmt(value)
32    tag.offset = offset
33    if srcid is not None:
34        tag.srcid = pmt.to_pmt(srcid)
35    return tag
36
37def compare_tags(a, b):
38    return a.offset == b.offset and pmt.equal(a.key, b.key) and \
39           pmt.equal(a.value, b.value) and pmt.equal(a.srcid, b.srcid)
40
41class test_vector_sink_source(gr_unittest.TestCase):
42
43    def setUp(self):
44        self.tb = gr.top_block()
45
46    def tearDown(self):
47        self.tb = None
48
49    def test_001(self):
50        # Test that sink has data set in source for the simplest case
51        src_data = [float(x) for x in range(16)]
52        expected_result = tuple(src_data)
53
54        src = blocks.vector_source_f(src_data)
55        dst = blocks.vector_sink_f()
56
57        self.tb.connect(src, dst)
58        self.tb.run()
59        result_data = dst.data()
60        self.assertEqual(expected_result, result_data)
61
62    def test_002(self):
63        # Test vectors (the gnuradio vector I/O type)
64        src_data = [float(x) for x in range(16)]
65        expected_result = tuple(src_data)
66
67        src = blocks.vector_source_f(src_data, False, 2)
68        dst = blocks.vector_sink_f(2)
69
70        self.tb.connect(src, dst)
71        self.tb.run()
72        result_data = dst.data()
73        self.assertEqual(expected_result, result_data)
74
75    def test_003(self):
76        # Test that we can only make vectors (the I/O type) if the input
77        # vector has sufficient size
78        src_data = [float(x) for x in range(16)]
79        expected_result = tuple(src_data)
80        self.assertRaises(RuntimeError, lambda : blocks.vector_source_f(src_data, False, 3))
81
82    def test_004(self):
83        # Test sending and receiving tagged streams
84        src_data = [float(x) for x in range(16)]
85        expected_result = tuple(src_data)
86        src_tags = tuple([make_tag('key', 'val', 0, 'src')])
87        expected_tags = src_tags[:]
88
89        src = blocks.vector_source_f(src_data, repeat=False, tags=src_tags)
90        dst = blocks.vector_sink_f()
91
92        self.tb.connect(src, dst)
93        self.tb.run()
94        result_data = dst.data()
95        result_tags = dst.tags()
96        self.assertEqual(expected_result, result_data)
97        self.assertEqual(len(result_tags), 1)
98        self.assertTrue(compare_tags(expected_tags[0], result_tags[0]))
99
100    def test_005(self):
101        # Test that repeat works (with tagged streams)
102        length = 16
103        src_data = [float(x) for x in range(length)]
104        expected_result = tuple(src_data + src_data)
105        src_tags = tuple([make_tag('key', 'val', 0, 'src')])
106        expected_tags = tuple([make_tag('key', 'val', 0, 'src'),
107                               make_tag('key', 'val', length, 'src')])
108
109        src = blocks.vector_source_f(src_data, repeat=True, tags=src_tags)
110        head = blocks.head(gr.sizeof_float, 2*length)
111        dst = blocks.vector_sink_f()
112
113        self.tb.connect(src, head, dst)
114        self.tb.run()
115        result_data = dst.data()
116        result_tags = dst.tags()
117        self.assertEqual(expected_result, result_data)
118        self.assertEqual(len(result_tags), 2)
119        self.assertTrue(compare_tags(expected_tags[0], result_tags[0]))
120        self.assertTrue(compare_tags(expected_tags[1], result_tags[1]))
121
122    def test_006(self):
123        # Test set_data
124        src_data = [float(x) for x in range(16)]
125        expected_result = tuple(src_data)
126
127        src = blocks.vector_source_f((3,1,4))
128        dst = blocks.vector_sink_f()
129        src.set_data(src_data)
130
131        self.tb.connect(src, dst)
132        self.tb.run()
133        result_data = dst.data()
134        self.assertEqual(expected_result, result_data)
135
136    def test_007(self):
137        # Test set_repeat
138        src_data = [float(x) for x in range(16)]
139        expected_result = tuple(src_data)
140
141        src = blocks.vector_source_f(src_data, True)
142        dst = blocks.vector_sink_f()
143        src.set_repeat(False)
144
145        self.tb.connect(src, dst)
146        # will timeout if set_repeat does not work
147        self.tb.run()
148        result_data = dst.data()
149        self.assertEqual(expected_result, result_data)
150
151
152if __name__ == '__main__':
153    gr_unittest.run(test_vector_sink_source, "test_vector_sink_source.xml")
154
155