1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright 2015 Free Software Foundation, Inc.
5#
6# This file is part of GNU Radio
7#
8# GNU Radio is free software; you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation; either version 3, or (at your option)
11# any later version.
12#
13# GNU Radio is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16# GNU General Public License for more details.
17#
18# You should have received a copy of the GNU General Public License
19# along with GNU Radio; see the file COPYING.  If not, write to
20# the Free Software Foundation, Inc., 51 Franklin Street,
21# Boston, MA 02110-1301, USA.
22#
23
24
25from gnuradio import gr, gr_unittest
26from gnuradio import blocks, digital
27import pmt
28import numpy as np
29import sys
30
31def make_length_tag(offset, length):
32    return gr.python_to_tag({'offset' : offset,
33                             'key' : pmt.intern('packet_len'),
34                             'value' : pmt.from_long(length),
35                             'srcid' : pmt.intern('qa_burst_shaper')})
36
37def make_tag(offset, key, value):
38    return gr.python_to_tag({'offset' : offset,
39                             'key' : pmt.intern(key),
40                             'value' : value,
41                             'srcid' : pmt.intern('qa_burst_shaper')})
42
43def compare_tags(a, b):
44    return a.offset == b.offset and pmt.equal(a.key, b.key) and \
45           pmt.equal(a.value, b.value)
46
47class qa_burst_shaper (gr_unittest.TestCase):
48
49    def setUp (self):
50        self.tb = gr.top_block ()
51
52    def tearDown (self):
53        self.tb = None
54
55    def test_ff (self):
56        '''
57        test_ff: test with float values, even length window, zero padding,
58            and no phasing
59        '''
60        prepad = 10
61        postpad = 10
62        length = 20
63        data = np.ones(length)
64        window = np.concatenate((-2.0*np.ones(5), -4.0*np.ones(5)))
65        tags = (make_length_tag(0, length),)
66        expected = np.concatenate((np.zeros(prepad), window[0:5],
67                                   np.ones(length - len(window)), window[5:10],
68                                   np.zeros(postpad)))
69        etag = make_length_tag(0, length + prepad + postpad)
70
71        # flowgraph
72        source = blocks.vector_source_f(data, tags=tags)
73        shaper = digital.burst_shaper_ff(window, pre_padding=prepad,
74                                         post_padding=postpad)
75        sink = blocks.vector_sink_f()
76        self.tb.connect(source, shaper, sink)
77        self.tb.run ()
78
79        # checks
80        self.assertFloatTuplesAlmostEqual(sink.data(), expected, 6)
81        self.assertTrue(compare_tags(sink.tags()[0], etag))
82
83    def test_cc (self):
84        '''
85        test_cc: test with complex values, even length window, zero padding,
86            and no phasing
87        '''
88        prepad = 10
89        postpad = 10
90        length = 20
91        data = np.ones(length, dtype=complex)
92        window = np.concatenate((-2.0*np.ones(5, dtype=complex),
93                                 -4.0*np.ones(5, dtype=complex)))
94        tags = (make_length_tag(0, length),)
95        expected = np.concatenate((np.zeros(prepad, dtype=complex), window[0:5],
96                                   np.ones(length - len(window), dtype=complex),
97                                   window[5:10], np.zeros(postpad,
98                                   dtype=complex)))
99        etag = make_length_tag(0, length + prepad + postpad)
100
101        # flowgraph
102        source = blocks.vector_source_c(data, tags=tags)
103        shaper = digital.burst_shaper_cc(window, pre_padding=prepad,
104                                         post_padding=postpad)
105        sink = blocks.vector_sink_c()
106        self.tb.connect(source, shaper, sink)
107        self.tb.run ()
108
109        # checks
110        self.assertComplexTuplesAlmostEqual(sink.data(), expected, 6)
111        self.assertTrue(compare_tags(sink.tags()[0], etag))
112
113    def test_ff_with_phasing (self):
114        '''
115        test_ff_with_phasing: test with float values, even length window, zero
116            padding, and phasing
117        '''
118        prepad = 10
119        postpad = 10
120        length = 20
121        data = np.ones(length)
122        window = np.concatenate((-2.0*np.ones(5), -4.0*np.ones(5)))
123        tags = (make_length_tag(0, length),)
124        phasing = np.zeros(5)
125        for i in range(5):
126            phasing[i] = ((-1.0)**i)
127        expected = np.concatenate((np.zeros(prepad), phasing*window[0:5],
128                                   np.ones(length), phasing*window[5:10],
129                                   np.zeros(postpad)))
130        etag = make_length_tag(0, length + prepad + postpad + len(window))
131
132        # flowgraph
133        source = blocks.vector_source_f(data, tags=tags)
134        shaper = digital.burst_shaper_ff(window, pre_padding=prepad,
135                                         post_padding=postpad,
136                                         insert_phasing=True)
137        sink = blocks.vector_sink_f()
138        self.tb.connect(source, shaper, sink)
139        self.tb.run ()
140
141        # checks
142        self.assertFloatTuplesAlmostEqual(sink.data(), expected, 6)
143        self.assertTrue(compare_tags(sink.tags()[0], etag))
144
145    def test_cc_with_phasing (self):
146        '''
147        test_cc_with_phasing: test with complex values, even length window, zero
148            padding, and phasing
149        '''
150        prepad = 10
151        postpad = 10
152        length = 20
153        data = np.ones(length, dtype=complex)
154        window = np.concatenate((-2.0*np.ones(5, dtype=complex),
155                                 -4.0*np.ones(5, dtype=complex)))
156        tags = (make_length_tag(0, length),)
157        phasing = np.zeros(5, dtype=complex)
158        for i in range(5):
159            phasing[i] = complex((-1.0)**i)
160        expected = np.concatenate((np.zeros(prepad, dtype=complex),
161                                   phasing*window[0:5],
162                                   np.ones(length, dtype=complex),
163                                   phasing*window[5:10],
164                                   np.zeros(postpad, dtype=complex)))
165        etag = make_length_tag(0, length + prepad + postpad + len(window))
166
167        # flowgraph
168        source = blocks.vector_source_c(data, tags=tags)
169        shaper = digital.burst_shaper_cc(window, pre_padding=prepad,
170                                         post_padding=postpad,
171                                         insert_phasing=True)
172        sink = blocks.vector_sink_c()
173        self.tb.connect(source, shaper, sink)
174        self.tb.run ()
175
176        # checks
177        self.assertComplexTuplesAlmostEqual(sink.data(), expected, 6)
178        self.assertTrue(compare_tags(sink.tags()[0], etag))
179
180    def test_odd_window (self):
181        '''
182        test_odd_window: test with odd length window; center sample should be
183            applied at end of up flank and beginning of down flank
184        '''
185        prepad = 10
186        postpad = 10
187        length = 20
188        data = np.ones(length)
189        window = np.concatenate((-2.0*np.ones(5), -3.0*np.ones(1),
190                                 -4.0*np.ones(5)))
191        tags = (make_length_tag(0, length),)
192        expected = np.concatenate((np.zeros(prepad), window[0:6],
193                                   np.ones(length - len(window) - 1),
194                                   window[5:11], np.zeros(postpad)))
195        etag = make_length_tag(0, length + prepad + postpad)
196
197        # flowgraph
198        source = blocks.vector_source_f(data, tags=tags)
199        shaper = digital.burst_shaper_ff(window, pre_padding=prepad,
200                                         post_padding=postpad)
201        sink = blocks.vector_sink_f()
202        self.tb.connect(source, shaper, sink)
203        self.tb.run ()
204
205        # checks
206        self.assertFloatTuplesAlmostEqual(sink.data(), expected, 6)
207        self.assertTrue(compare_tags(sink.tags()[0], etag))
208
209    def test_short_burst (self):
210        '''
211        test_short_burst: test with burst length shorter than window length;
212            clips the window up and down flanks to FLOOR(length/2) samples
213        '''
214        prepad = 10
215        postpad = 10
216        length = 9
217        data = np.ones(length)
218        window = np.arange(length + 2, dtype=float)
219        tags = (make_length_tag(0, length),)
220        expected = np.concatenate((np.zeros(prepad), window[0:4],
221                                   np.ones(1), window[5:9],
222                                   np.zeros(postpad)))
223        etag = make_length_tag(0, length + prepad + postpad)
224
225        # flowgraph
226        source = blocks.vector_source_f(data, tags=tags)
227        shaper = digital.burst_shaper_ff(window, pre_padding=prepad,
228                                         post_padding=postpad)
229        sink = blocks.vector_sink_f()
230        self.tb.connect(source, shaper, sink)
231        self.tb.run ()
232
233        # checks
234        self.assertFloatTuplesAlmostEqual(sink.data(), expected, 6)
235        self.assertTrue(compare_tags(sink.tags()[0], etag))
236
237    def test_consecutive_bursts (self):
238        '''
239        test_consecutive_bursts: test with consecutive bursts of different
240            lengths
241        '''
242        prepad = 10
243        postpad = 10
244        length1 = 15
245        length2 = 25
246        data = np.concatenate((np.ones(length1), -1.0*np.ones(length2)))
247        window = np.concatenate((-2.0*np.ones(5), -4.0*np.ones(5)))
248        tags = (make_length_tag(0, length1), make_length_tag(length1, length2))
249        expected = np.concatenate((np.zeros(prepad), window[0:5],
250                                   np.ones(length1 - len(window)), window[5:10],
251                                   np.zeros(postpad + prepad), -1.0*window[0:5],
252                                   -1.0*np.ones(length2 - len(window)),
253                                   -1.0*window[5:10], np.zeros(postpad)))
254        etags = (make_length_tag(0, length1 + prepad + postpad),
255                 make_length_tag(length1 + prepad + postpad,
256                                 length2 + prepad + postpad))
257
258        # flowgraph
259        source = blocks.vector_source_f(data, tags=tags)
260        shaper = digital.burst_shaper_ff(window, pre_padding=prepad,
261                                         post_padding=postpad)
262        sink = blocks.vector_sink_f()
263        self.tb.connect(source, shaper, sink)
264        self.tb.run ()
265
266        # checks
267        self.assertFloatTuplesAlmostEqual(sink.data(), expected, 6)
268        for i in range(len(etags)):
269            self.assertTrue(compare_tags(sink.tags()[i], etags[i]))
270
271    def test_tag_gap (self):
272        '''
273        test_tag_gap: test with gap between tags; should drop samples that are
274            between proper tagged streams
275        '''
276        prepad = 10
277        postpad = 10
278        length = 20
279        gap_len = 5
280        data = np.arange(2*length + gap_len, dtype=float)
281        window = np.concatenate((-2.0*np.ones(5), -4.0*np.ones(5)))
282        ewindow = window * np.array([1,-1,1,-1,1,1,-1,1,-1,1],dtype=float)
283        tags = (make_length_tag(0, length),
284                make_length_tag(length + gap_len, length))
285        expected = np.concatenate((np.zeros(prepad), ewindow[0:5],
286                                   np.arange(0, length, dtype=float),
287                                   ewindow[5:10], np.zeros(postpad),
288                                   np.zeros(prepad), ewindow[0:5],
289                                   np.arange(length + gap_len,
290                                             2*length + gap_len, dtype=float),
291                                   ewindow[5:10], np.zeros(postpad)))
292        burst_len = length + len(window) + prepad + postpad
293        etags = (make_length_tag(0, burst_len),
294                 make_length_tag(burst_len, burst_len))
295
296        # flowgraph
297        source = blocks.vector_source_f(data, tags=tags)
298        shaper = digital.burst_shaper_ff(window, pre_padding=prepad,
299                                         post_padding=postpad,
300                                         insert_phasing=True)
301        sink = blocks.vector_sink_f()
302        self.tb.connect(source, shaper, sink)
303        self.tb.run ()
304
305        # checks
306        self.assertFloatTuplesAlmostEqual(sink.data(), expected, 6)
307        for i in range(len(etags)):
308            self.assertTrue(compare_tags(sink.tags()[i], etags[i]))
309
310    def test_tag_propagation (self):
311        '''
312        test_tag_propagation: test that non length tags are handled correctly
313        '''
314        prepad = 10
315        postpad = 10
316        length1 = 15
317        length2 = 25
318        gap_len = 5
319        lentag1_offset = 0
320        lentag2_offset = length1 + gap_len
321        tag1_offset = 0                     # accompanies first length tag
322        tag2_offset = length1 + gap_len     # accompanies second length tag
323        tag3_offset = 2                     # in ramp-up state
324        tag4_offset = length1 + 2           # in gap; tag will be dropped
325        tag5_offset = length1 + gap_len + 7 # in copy state
326
327        data = np.concatenate((np.ones(length1), np.zeros(gap_len),
328                               -1.0*np.ones(length2)))
329        window = np.concatenate((-2.0*np.ones(5), -4.0*np.ones(5)))
330        tags = (make_length_tag(lentag1_offset, length1),
331                make_length_tag(lentag2_offset, length2),
332                make_tag(tag1_offset, 'head', pmt.intern('tag1')),
333                make_tag(tag2_offset, 'head', pmt.intern('tag2')),
334                make_tag(tag3_offset, 'body', pmt.intern('tag3')),
335                make_tag(tag4_offset, 'body', pmt.intern('tag4')),
336                make_tag(tag5_offset, 'body', pmt.intern('tag5')))
337        expected = np.concatenate((np.zeros(prepad), window[0:5],
338                                   np.ones(length1 - len(window)), window[5:10],
339                                   np.zeros(postpad + prepad), -1.0*window[0:5],
340                                   -1.0*np.ones(length2 - len(window)),
341                                   -1.0*window[5:10], np.zeros(postpad)))
342        elentag1_offset = 0
343        elentag2_offset = length1 + prepad + postpad
344        etag1_offset = 0
345        etag2_offset = elentag2_offset
346        etag3_offset = prepad + tag3_offset
347        etag5_offset = 2*prepad + postpad + tag5_offset - gap_len
348        etags = (make_length_tag(elentag1_offset, length1 + prepad + postpad),
349                 make_length_tag(elentag2_offset, length2 + prepad + postpad),
350                 make_tag(etag1_offset, 'head', pmt.intern('tag1')),
351                 make_tag(etag2_offset, 'head', pmt.intern('tag2')),
352                 make_tag(etag3_offset, 'body', pmt.intern('tag3')),
353                 make_tag(etag5_offset, 'body', pmt.intern('tag5')))
354
355        # flowgraph
356        source = blocks.vector_source_f(data, tags=tags)
357        shaper = digital.burst_shaper_ff(window, pre_padding=prepad,
358                                         post_padding=postpad)
359        sink = blocks.vector_sink_f()
360        self.tb.connect(source, shaper, sink)
361        self.tb.run ()
362
363        # checks
364        self.assertFloatTuplesAlmostEqual(sink.data(), expected, 6)
365        for x, y in zip(sorted(sink.tags(), key=gr.tag_t_offset_compare_key()),
366                        sorted(etags, key=gr.tag_t_offset_compare_key())):
367            self.assertTrue(compare_tags(x, y))
368
369
370if __name__ == '__main__':
371    gr_unittest.run(qa_burst_shaper, "qa_burst_shaper.xml")
372