1#!/usr/bin/env python
2#
3# Copyright 2018 Free Software Foundation, Inc.
4#
5# This file is part of GNU Radio
6#
7# GNU Radio is free software; you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 3, or (at your option)
10# any later version.
11#
12# GNU Radio is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with GNU Radio; see the file COPYING.  If not, write to
19# the Free Software Foundation, Inc., 51 Franklin Street,
20# Boston, MA 02110-1301, USA.
21#
22
23import os
24import tempfile
25import array
26import pmt
27from gnuradio import gr, gr_unittest, blocks
28
29class test_file_source(gr_unittest.TestCase):
30
31    @classmethod
32    def setUpClass(cls):
33        os.environ['GR_CONF_CONTROLPORT_ON'] = 'False'
34        cls._datafile = tempfile.NamedTemporaryFile()
35        cls._datafilename = cls._datafile.name
36        cls._vector = [x for x in range(1000)]
37        with open(cls._datafilename, 'wb') as f:
38            array.array('f', cls._vector).tofile(f)
39
40    @classmethod
41    def tearDownClass(cls):
42        del cls._vector
43        del cls._datafilename
44        del cls._datafile
45
46    def setUp (self):
47        self.tb = gr.top_block()
48
49    def tearDown (self):
50        self.tb = None
51
52    def test_file_source(self):
53        src = blocks.file_source(gr.sizeof_float, self._datafilename)
54        snk = blocks.vector_sink_f()
55        self.tb.connect(src, snk)
56        self.tb.run()
57
58        result_data = snk.data()
59        self.assertFloatTuplesAlmostEqual(self._vector, result_data)
60        self.assertEqual(len(snk.tags()), 0)
61
62    def test_file_source_no_such_file(self):
63        """
64        Try to open a non-existent file and verify exception is thrown.
65        """
66        try:
67            _ = blocks.file_source(gr.sizeof_float, "___no_such_file___")
68            self.assertTrue(False)
69        except RuntimeError:
70            self.assertTrue(True)
71
72    def test_file_source_with_offset(self):
73        expected_result = self._vector[100:]
74
75        src = blocks.file_source(gr.sizeof_float, self._datafilename, offset=100)
76        snk = blocks.vector_sink_f()
77
78        self.tb.connect(src, snk)
79        self.tb.run()
80
81        result_data = snk.data()
82        self.assertFloatTuplesAlmostEqual(expected_result, result_data)
83        self.assertEqual(len(snk.tags()), 0)
84
85    def test_source_with_offset_and_len(self):
86        expected_result = self._vector[100:100+600]
87
88        src = blocks.file_source(gr.sizeof_float, self._datafilename, offset=100, len=600)
89        snk = blocks.vector_sink_f()
90        self.tb.connect(src, snk)
91        self.tb.run()
92
93        result_data = snk.data()
94        self.assertFloatTuplesAlmostEqual(expected_result, result_data)
95        self.assertEqual(len(snk.tags()), 0)
96
97    def test_file_source_can_seek_after_open(self):
98
99        src = blocks.file_source(gr.sizeof_float, self._datafilename)
100        self.assertTrue(src.seek(0, os.SEEK_SET))
101        self.assertTrue(src.seek(len(self._vector)-1, os.SEEK_SET))
102        # Seek past end of file - this will also log a warning
103        self.assertFalse(src.seek(len(self._vector), os.SEEK_SET))
104        # Negative seek - this will also log a warning
105        self.assertFalse(src.seek(-1, os.SEEK_SET))
106
107        self.assertTrue(src.seek(1, os.SEEK_END))
108        self.assertTrue(src.seek(len(self._vector), os.SEEK_END))
109        # Seek past end of file - this will also log a warning
110        self.assertFalse(src.seek(0, os.SEEK_END))
111
112        self.assertTrue(src.seek(0, os.SEEK_SET))
113        self.assertTrue(src.seek(1, os.SEEK_CUR))
114        # Seek past end of file - this will also log a warning
115        self.assertFalse(src.seek(len(self._vector), os.SEEK_CUR))
116
117
118    def test_begin_tag(self):
119        expected_result = self._vector
120
121        src = blocks.file_source(gr.sizeof_float, self._datafilename)
122        src.set_begin_tag(pmt.string_to_symbol("file_begin"))
123        snk = blocks.vector_sink_f()
124        self.tb.connect(src, snk)
125        self.tb.run()
126
127        result_data = snk.data()
128        self.assertFloatTuplesAlmostEqual(expected_result, result_data)
129        self.assertEqual(len(snk.tags()), 1)
130
131    def test_begin_tag_repeat(self):
132        expected_result = self._vector + self._vector
133
134        src = blocks.file_source(gr.sizeof_float, self._datafilename, True)
135        src.set_begin_tag(pmt.string_to_symbol("file_begin"))
136        head = blocks.head(gr.sizeof_float, 2 * len(self._vector))
137        snk = blocks.vector_sink_f()
138        self.tb.connect(src, head, snk)
139        self.tb.run()
140
141        result_data = snk.data()
142        self.assertFloatTuplesAlmostEqual(expected_result, result_data)
143        tags = snk.tags()
144        self.assertEqual(len(tags), 2)
145        self.assertEqual(str(tags[0].key), "file_begin")
146        self.assertEqual(str(tags[0].value), "0")
147        self.assertEqual(tags[0].offset, 0)
148        self.assertEqual(str(tags[1].key), "file_begin")
149        self.assertEqual(str(tags[1].value), "1")
150        self.assertEqual(tags[1].offset, 1000)
151
152if __name__ == '__main__':
153    gr_unittest.run(test_file_source, "test_file_source.xml")
154