1#!/usr/bin/env python 2# 3# Copyright 2018 Free Software Foundation, Inc. 4# 5# This file is part of GNU Radio 6# 7# GNU Radio is free software; you can redistribute it and/or modify 8# it under the terms of the GNU General Public License as published by 9# the Free Software Foundation; either version 3, or (at your option) 10# any later version. 11# 12# GNU Radio is distributed in the hope that it will be useful, 13# but WITHOUT ANY WARRANTY; without even the implied warranty of 14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15# GNU General Public License for more details. 16# 17# You should have received a copy of the GNU General Public License 18# along with GNU Radio; see the file COPYING. If not, write to 19# the Free Software Foundation, Inc., 51 Franklin Street, 20# Boston, MA 02110-1301, USA. 21# 22 23import os 24import tempfile 25import array 26import pmt 27from gnuradio import gr, gr_unittest, blocks 28 29class test_file_source(gr_unittest.TestCase): 30 31 @classmethod 32 def setUpClass(cls): 33 os.environ['GR_CONF_CONTROLPORT_ON'] = 'False' 34 cls._datafile = tempfile.NamedTemporaryFile() 35 cls._datafilename = cls._datafile.name 36 cls._vector = [x for x in range(1000)] 37 with open(cls._datafilename, 'wb') as f: 38 array.array('f', cls._vector).tofile(f) 39 40 @classmethod 41 def tearDownClass(cls): 42 del cls._vector 43 del cls._datafilename 44 del cls._datafile 45 46 def setUp (self): 47 self.tb = gr.top_block() 48 49 def tearDown (self): 50 self.tb = None 51 52 def test_file_source(self): 53 src = blocks.file_source(gr.sizeof_float, self._datafilename) 54 snk = blocks.vector_sink_f() 55 self.tb.connect(src, snk) 56 self.tb.run() 57 58 result_data = snk.data() 59 self.assertFloatTuplesAlmostEqual(self._vector, result_data) 60 self.assertEqual(len(snk.tags()), 0) 61 62 def test_file_source_no_such_file(self): 63 """ 64 Try to open a non-existent file and verify exception is thrown. 65 """ 66 try: 67 _ = blocks.file_source(gr.sizeof_float, "___no_such_file___") 68 self.assertTrue(False) 69 except RuntimeError: 70 self.assertTrue(True) 71 72 def test_file_source_with_offset(self): 73 expected_result = self._vector[100:] 74 75 src = blocks.file_source(gr.sizeof_float, self._datafilename, offset=100) 76 snk = blocks.vector_sink_f() 77 78 self.tb.connect(src, snk) 79 self.tb.run() 80 81 result_data = snk.data() 82 self.assertFloatTuplesAlmostEqual(expected_result, result_data) 83 self.assertEqual(len(snk.tags()), 0) 84 85 def test_source_with_offset_and_len(self): 86 expected_result = self._vector[100:100+600] 87 88 src = blocks.file_source(gr.sizeof_float, self._datafilename, offset=100, len=600) 89 snk = blocks.vector_sink_f() 90 self.tb.connect(src, snk) 91 self.tb.run() 92 93 result_data = snk.data() 94 self.assertFloatTuplesAlmostEqual(expected_result, result_data) 95 self.assertEqual(len(snk.tags()), 0) 96 97 def test_file_source_can_seek_after_open(self): 98 99 src = blocks.file_source(gr.sizeof_float, self._datafilename) 100 self.assertTrue(src.seek(0, os.SEEK_SET)) 101 self.assertTrue(src.seek(len(self._vector)-1, os.SEEK_SET)) 102 # Seek past end of file - this will also log a warning 103 self.assertFalse(src.seek(len(self._vector), os.SEEK_SET)) 104 # Negative seek - this will also log a warning 105 self.assertFalse(src.seek(-1, os.SEEK_SET)) 106 107 self.assertTrue(src.seek(1, os.SEEK_END)) 108 self.assertTrue(src.seek(len(self._vector), os.SEEK_END)) 109 # Seek past end of file - this will also log a warning 110 self.assertFalse(src.seek(0, os.SEEK_END)) 111 112 self.assertTrue(src.seek(0, os.SEEK_SET)) 113 self.assertTrue(src.seek(1, os.SEEK_CUR)) 114 # Seek past end of file - this will also log a warning 115 self.assertFalse(src.seek(len(self._vector), os.SEEK_CUR)) 116 117 118 def test_begin_tag(self): 119 expected_result = self._vector 120 121 src = blocks.file_source(gr.sizeof_float, self._datafilename) 122 src.set_begin_tag(pmt.string_to_symbol("file_begin")) 123 snk = blocks.vector_sink_f() 124 self.tb.connect(src, snk) 125 self.tb.run() 126 127 result_data = snk.data() 128 self.assertFloatTuplesAlmostEqual(expected_result, result_data) 129 self.assertEqual(len(snk.tags()), 1) 130 131 def test_begin_tag_repeat(self): 132 expected_result = self._vector + self._vector 133 134 src = blocks.file_source(gr.sizeof_float, self._datafilename, True) 135 src.set_begin_tag(pmt.string_to_symbol("file_begin")) 136 head = blocks.head(gr.sizeof_float, 2 * len(self._vector)) 137 snk = blocks.vector_sink_f() 138 self.tb.connect(src, head, snk) 139 self.tb.run() 140 141 result_data = snk.data() 142 self.assertFloatTuplesAlmostEqual(expected_result, result_data) 143 tags = snk.tags() 144 self.assertEqual(len(tags), 2) 145 self.assertEqual(str(tags[0].key), "file_begin") 146 self.assertEqual(str(tags[0].value), "0") 147 self.assertEqual(tags[0].offset, 0) 148 self.assertEqual(str(tags[1].key), "file_begin") 149 self.assertEqual(str(tags[1].value), "1") 150 self.assertEqual(tags[1].offset, 1000) 151 152if __name__ == '__main__': 153 gr_unittest.run(test_file_source, "test_file_source.xml") 154