1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3# 4# Copyright 2015 Free Software Foundation, Inc. 5# 6# This file is part of GNU Radio 7# 8# GNU Radio is free software; you can redistribute it and/or modify 9# it under the terms of the GNU General Public License as published by 10# the Free Software Foundation; either version 3, or (at your option) 11# any later version. 12# 13# GNU Radio is distributed in the hope that it will be useful, 14# but WITHOUT ANY WARRANTY; without even the implied warranty of 15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 16# GNU General Public License for more details. 17# 18# You should have received a copy of the GNU General Public License 19# along with GNU Radio; see the file COPYING. If not, write to 20# the Free Software Foundation, Inc., 51 Franklin Street, 21# Boston, MA 02110-1301, USA. 22# 23 24 25from gnuradio import gr, gr_unittest 26from gnuradio import blocks, digital 27import pmt 28import numpy as np 29import sys 30 31def make_length_tag(offset, length): 32 return gr.python_to_tag({'offset' : offset, 33 'key' : pmt.intern('packet_len'), 34 'value' : pmt.from_long(length), 35 'srcid' : pmt.intern('qa_burst_shaper')}) 36 37def make_tag(offset, key, value): 38 return gr.python_to_tag({'offset' : offset, 39 'key' : pmt.intern(key), 40 'value' : value, 41 'srcid' : pmt.intern('qa_burst_shaper')}) 42 43def compare_tags(a, b): 44 return a.offset == b.offset and pmt.equal(a.key, b.key) and \ 45 pmt.equal(a.value, b.value) 46 47class qa_burst_shaper (gr_unittest.TestCase): 48 49 def setUp (self): 50 self.tb = gr.top_block () 51 52 def tearDown (self): 53 self.tb = None 54 55 def test_ff (self): 56 ''' 57 test_ff: test with float values, even length window, zero padding, 58 and no phasing 59 ''' 60 prepad = 10 61 postpad = 10 62 length = 20 63 data = np.ones(length) 64 window = np.concatenate((-2.0*np.ones(5), -4.0*np.ones(5))) 65 tags = (make_length_tag(0, length),) 66 expected = np.concatenate((np.zeros(prepad), window[0:5], 67 np.ones(length - len(window)), window[5:10], 68 np.zeros(postpad))) 69 etag = make_length_tag(0, length + prepad + postpad) 70 71 # flowgraph 72 source = blocks.vector_source_f(data, tags=tags) 73 shaper = digital.burst_shaper_ff(window, pre_padding=prepad, 74 post_padding=postpad) 75 sink = blocks.vector_sink_f() 76 self.tb.connect(source, shaper, sink) 77 self.tb.run () 78 79 # checks 80 self.assertFloatTuplesAlmostEqual(sink.data(), expected, 6) 81 self.assertTrue(compare_tags(sink.tags()[0], etag)) 82 83 def test_cc (self): 84 ''' 85 test_cc: test with complex values, even length window, zero padding, 86 and no phasing 87 ''' 88 prepad = 10 89 postpad = 10 90 length = 20 91 data = np.ones(length, dtype=complex) 92 window = np.concatenate((-2.0*np.ones(5, dtype=complex), 93 -4.0*np.ones(5, dtype=complex))) 94 tags = (make_length_tag(0, length),) 95 expected = np.concatenate((np.zeros(prepad, dtype=complex), window[0:5], 96 np.ones(length - len(window), dtype=complex), 97 window[5:10], np.zeros(postpad, 98 dtype=complex))) 99 etag = make_length_tag(0, length + prepad + postpad) 100 101 # flowgraph 102 source = blocks.vector_source_c(data, tags=tags) 103 shaper = digital.burst_shaper_cc(window, pre_padding=prepad, 104 post_padding=postpad) 105 sink = blocks.vector_sink_c() 106 self.tb.connect(source, shaper, sink) 107 self.tb.run () 108 109 # checks 110 self.assertComplexTuplesAlmostEqual(sink.data(), expected, 6) 111 self.assertTrue(compare_tags(sink.tags()[0], etag)) 112 113 def test_ff_with_phasing (self): 114 ''' 115 test_ff_with_phasing: test with float values, even length window, zero 116 padding, and phasing 117 ''' 118 prepad = 10 119 postpad = 10 120 length = 20 121 data = np.ones(length) 122 window = np.concatenate((-2.0*np.ones(5), -4.0*np.ones(5))) 123 tags = (make_length_tag(0, length),) 124 phasing = np.zeros(5) 125 for i in range(5): 126 phasing[i] = ((-1.0)**i) 127 expected = np.concatenate((np.zeros(prepad), phasing*window[0:5], 128 np.ones(length), phasing*window[5:10], 129 np.zeros(postpad))) 130 etag = make_length_tag(0, length + prepad + postpad + len(window)) 131 132 # flowgraph 133 source = blocks.vector_source_f(data, tags=tags) 134 shaper = digital.burst_shaper_ff(window, pre_padding=prepad, 135 post_padding=postpad, 136 insert_phasing=True) 137 sink = blocks.vector_sink_f() 138 self.tb.connect(source, shaper, sink) 139 self.tb.run () 140 141 # checks 142 self.assertFloatTuplesAlmostEqual(sink.data(), expected, 6) 143 self.assertTrue(compare_tags(sink.tags()[0], etag)) 144 145 def test_cc_with_phasing (self): 146 ''' 147 test_cc_with_phasing: test with complex values, even length window, zero 148 padding, and phasing 149 ''' 150 prepad = 10 151 postpad = 10 152 length = 20 153 data = np.ones(length, dtype=complex) 154 window = np.concatenate((-2.0*np.ones(5, dtype=complex), 155 -4.0*np.ones(5, dtype=complex))) 156 tags = (make_length_tag(0, length),) 157 phasing = np.zeros(5, dtype=complex) 158 for i in range(5): 159 phasing[i] = complex((-1.0)**i) 160 expected = np.concatenate((np.zeros(prepad, dtype=complex), 161 phasing*window[0:5], 162 np.ones(length, dtype=complex), 163 phasing*window[5:10], 164 np.zeros(postpad, dtype=complex))) 165 etag = make_length_tag(0, length + prepad + postpad + len(window)) 166 167 # flowgraph 168 source = blocks.vector_source_c(data, tags=tags) 169 shaper = digital.burst_shaper_cc(window, pre_padding=prepad, 170 post_padding=postpad, 171 insert_phasing=True) 172 sink = blocks.vector_sink_c() 173 self.tb.connect(source, shaper, sink) 174 self.tb.run () 175 176 # checks 177 self.assertComplexTuplesAlmostEqual(sink.data(), expected, 6) 178 self.assertTrue(compare_tags(sink.tags()[0], etag)) 179 180 def test_odd_window (self): 181 ''' 182 test_odd_window: test with odd length window; center sample should be 183 applied at end of up flank and beginning of down flank 184 ''' 185 prepad = 10 186 postpad = 10 187 length = 20 188 data = np.ones(length) 189 window = np.concatenate((-2.0*np.ones(5), -3.0*np.ones(1), 190 -4.0*np.ones(5))) 191 tags = (make_length_tag(0, length),) 192 expected = np.concatenate((np.zeros(prepad), window[0:6], 193 np.ones(length - len(window) - 1), 194 window[5:11], np.zeros(postpad))) 195 etag = make_length_tag(0, length + prepad + postpad) 196 197 # flowgraph 198 source = blocks.vector_source_f(data, tags=tags) 199 shaper = digital.burst_shaper_ff(window, pre_padding=prepad, 200 post_padding=postpad) 201 sink = blocks.vector_sink_f() 202 self.tb.connect(source, shaper, sink) 203 self.tb.run () 204 205 # checks 206 self.assertFloatTuplesAlmostEqual(sink.data(), expected, 6) 207 self.assertTrue(compare_tags(sink.tags()[0], etag)) 208 209 def test_short_burst (self): 210 ''' 211 test_short_burst: test with burst length shorter than window length; 212 clips the window up and down flanks to FLOOR(length/2) samples 213 ''' 214 prepad = 10 215 postpad = 10 216 length = 9 217 data = np.ones(length) 218 window = np.arange(length + 2, dtype=float) 219 tags = (make_length_tag(0, length),) 220 expected = np.concatenate((np.zeros(prepad), window[0:4], 221 np.ones(1), window[5:9], 222 np.zeros(postpad))) 223 etag = make_length_tag(0, length + prepad + postpad) 224 225 # flowgraph 226 source = blocks.vector_source_f(data, tags=tags) 227 shaper = digital.burst_shaper_ff(window, pre_padding=prepad, 228 post_padding=postpad) 229 sink = blocks.vector_sink_f() 230 self.tb.connect(source, shaper, sink) 231 self.tb.run () 232 233 # checks 234 self.assertFloatTuplesAlmostEqual(sink.data(), expected, 6) 235 self.assertTrue(compare_tags(sink.tags()[0], etag)) 236 237 def test_consecutive_bursts (self): 238 ''' 239 test_consecutive_bursts: test with consecutive bursts of different 240 lengths 241 ''' 242 prepad = 10 243 postpad = 10 244 length1 = 15 245 length2 = 25 246 data = np.concatenate((np.ones(length1), -1.0*np.ones(length2))) 247 window = np.concatenate((-2.0*np.ones(5), -4.0*np.ones(5))) 248 tags = (make_length_tag(0, length1), make_length_tag(length1, length2)) 249 expected = np.concatenate((np.zeros(prepad), window[0:5], 250 np.ones(length1 - len(window)), window[5:10], 251 np.zeros(postpad + prepad), -1.0*window[0:5], 252 -1.0*np.ones(length2 - len(window)), 253 -1.0*window[5:10], np.zeros(postpad))) 254 etags = (make_length_tag(0, length1 + prepad + postpad), 255 make_length_tag(length1 + prepad + postpad, 256 length2 + prepad + postpad)) 257 258 # flowgraph 259 source = blocks.vector_source_f(data, tags=tags) 260 shaper = digital.burst_shaper_ff(window, pre_padding=prepad, 261 post_padding=postpad) 262 sink = blocks.vector_sink_f() 263 self.tb.connect(source, shaper, sink) 264 self.tb.run () 265 266 # checks 267 self.assertFloatTuplesAlmostEqual(sink.data(), expected, 6) 268 for i in range(len(etags)): 269 self.assertTrue(compare_tags(sink.tags()[i], etags[i])) 270 271 def test_tag_gap (self): 272 ''' 273 test_tag_gap: test with gap between tags; should drop samples that are 274 between proper tagged streams 275 ''' 276 prepad = 10 277 postpad = 10 278 length = 20 279 gap_len = 5 280 data = np.arange(2*length + gap_len, dtype=float) 281 window = np.concatenate((-2.0*np.ones(5), -4.0*np.ones(5))) 282 ewindow = window * np.array([1,-1,1,-1,1,1,-1,1,-1,1],dtype=float) 283 tags = (make_length_tag(0, length), 284 make_length_tag(length + gap_len, length)) 285 expected = np.concatenate((np.zeros(prepad), ewindow[0:5], 286 np.arange(0, length, dtype=float), 287 ewindow[5:10], np.zeros(postpad), 288 np.zeros(prepad), ewindow[0:5], 289 np.arange(length + gap_len, 290 2*length + gap_len, dtype=float), 291 ewindow[5:10], np.zeros(postpad))) 292 burst_len = length + len(window) + prepad + postpad 293 etags = (make_length_tag(0, burst_len), 294 make_length_tag(burst_len, burst_len)) 295 296 # flowgraph 297 source = blocks.vector_source_f(data, tags=tags) 298 shaper = digital.burst_shaper_ff(window, pre_padding=prepad, 299 post_padding=postpad, 300 insert_phasing=True) 301 sink = blocks.vector_sink_f() 302 self.tb.connect(source, shaper, sink) 303 self.tb.run () 304 305 # checks 306 self.assertFloatTuplesAlmostEqual(sink.data(), expected, 6) 307 for i in range(len(etags)): 308 self.assertTrue(compare_tags(sink.tags()[i], etags[i])) 309 310 def test_tag_propagation (self): 311 ''' 312 test_tag_propagation: test that non length tags are handled correctly 313 ''' 314 prepad = 10 315 postpad = 10 316 length1 = 15 317 length2 = 25 318 gap_len = 5 319 lentag1_offset = 0 320 lentag2_offset = length1 + gap_len 321 tag1_offset = 0 # accompanies first length tag 322 tag2_offset = length1 + gap_len # accompanies second length tag 323 tag3_offset = 2 # in ramp-up state 324 tag4_offset = length1 + 2 # in gap; tag will be dropped 325 tag5_offset = length1 + gap_len + 7 # in copy state 326 327 data = np.concatenate((np.ones(length1), np.zeros(gap_len), 328 -1.0*np.ones(length2))) 329 window = np.concatenate((-2.0*np.ones(5), -4.0*np.ones(5))) 330 tags = (make_length_tag(lentag1_offset, length1), 331 make_length_tag(lentag2_offset, length2), 332 make_tag(tag1_offset, 'head', pmt.intern('tag1')), 333 make_tag(tag2_offset, 'head', pmt.intern('tag2')), 334 make_tag(tag3_offset, 'body', pmt.intern('tag3')), 335 make_tag(tag4_offset, 'body', pmt.intern('tag4')), 336 make_tag(tag5_offset, 'body', pmt.intern('tag5'))) 337 expected = np.concatenate((np.zeros(prepad), window[0:5], 338 np.ones(length1 - len(window)), window[5:10], 339 np.zeros(postpad + prepad), -1.0*window[0:5], 340 -1.0*np.ones(length2 - len(window)), 341 -1.0*window[5:10], np.zeros(postpad))) 342 elentag1_offset = 0 343 elentag2_offset = length1 + prepad + postpad 344 etag1_offset = 0 345 etag2_offset = elentag2_offset 346 etag3_offset = prepad + tag3_offset 347 etag5_offset = 2*prepad + postpad + tag5_offset - gap_len 348 etags = (make_length_tag(elentag1_offset, length1 + prepad + postpad), 349 make_length_tag(elentag2_offset, length2 + prepad + postpad), 350 make_tag(etag1_offset, 'head', pmt.intern('tag1')), 351 make_tag(etag2_offset, 'head', pmt.intern('tag2')), 352 make_tag(etag3_offset, 'body', pmt.intern('tag3')), 353 make_tag(etag5_offset, 'body', pmt.intern('tag5'))) 354 355 # flowgraph 356 source = blocks.vector_source_f(data, tags=tags) 357 shaper = digital.burst_shaper_ff(window, pre_padding=prepad, 358 post_padding=postpad) 359 sink = blocks.vector_sink_f() 360 self.tb.connect(source, shaper, sink) 361 self.tb.run () 362 363 # checks 364 self.assertFloatTuplesAlmostEqual(sink.data(), expected, 6) 365 for x, y in zip(sorted(sink.tags(), key=gr.tag_t_offset_compare_key()), 366 sorted(etags, key=gr.tag_t_offset_compare_key())): 367 self.assertTrue(compare_tags(x, y)) 368 369 370if __name__ == '__main__': 371 gr_unittest.run(qa_burst_shaper, "qa_burst_shaper.xml") 372