1 /* -*- c++ -*- */
2 /*
3  * Copyright 2012,2014 Free Software Foundation, Inc.
4  *
5  * This file is part of GNU Radio
6  *
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3, or (at your option)
10  * any later version.
11  *
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with GNU Radio; see the file COPYING.  If not, write to
19  * the Free Software Foundation, Inc., 51 Franklin Street,
20  * Boston, MA 02110-1301, USA.
21  */
22 
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26 
27 #include "stream_mux_impl.h"
28 #include <gnuradio/io_signature.h>
29 #include <boost/foreach.hpp>
30 #include <cstring>
31 
32 namespace gr {
33 namespace blocks {
34 
make(size_t itemsize,const std::vector<int> & lengths)35 stream_mux::sptr stream_mux::make(size_t itemsize, const std::vector<int>& lengths)
36 {
37     return gnuradio::get_initial_sptr(new stream_mux_impl(itemsize, lengths));
38 }
39 
stream_mux_impl(size_t itemsize,const std::vector<int> & lengths)40 stream_mux_impl::stream_mux_impl(size_t itemsize, const std::vector<int>& lengths)
41     : block("stream_mux",
42             io_signature::make(1, -1, itemsize),
43             io_signature::make(1, 1, itemsize)),
44       d_itemsize(itemsize),
45       d_stream(0),
46       d_residual(0),
47       d_lengths(lengths)
48 {
49     while (d_lengths[d_stream] == 0) {
50         d_stream++;
51         if (d_stream == d_lengths.size()) {
52             throw std::invalid_argument("At least one size must be non-zero.");
53         }
54     }
55     d_residual = d_lengths[d_stream];
56     set_tag_propagation_policy(TPP_DONT);
57 }
58 
forecast(int noutput_items,gr_vector_int & ninput_items_required)59 void stream_mux_impl::forecast(int noutput_items, gr_vector_int& ninput_items_required)
60 {
61     unsigned ninputs = ninput_items_required.size();
62     for (unsigned i = 0; i < ninputs; i++) {
63         // Only active inputs *need* items, for the rest, it would just be nice
64         ninput_items_required[i] = (d_stream == i ? 1 : 0);
65     }
66 }
67 
68 
general_work(int noutput_items,gr_vector_int & ninput_items,gr_vector_const_void_star & input_items,gr_vector_void_star & output_items)69 int stream_mux_impl::general_work(int noutput_items,
70                                   gr_vector_int& ninput_items,
71                                   gr_vector_const_void_star& input_items,
72                                   gr_vector_void_star& output_items)
73 {
74     char* out = (char*)output_items[0];
75     const char* in;
76     int out_index = 0;                              // Items written
77     gr_vector_int input_index(d_lengths.size(), 0); // Items read
78     std::vector<gr::tag_t> stream_t;
79 
80     while (out_index < noutput_items) {
81         if (ninput_items[d_stream] <= input_index[d_stream]) {
82             break;
83         }
84         int space_left_in_buffers = std::min(
85             noutput_items - out_index,                     // Space left in output buffer
86             ninput_items[d_stream] - input_index[d_stream] // Space left in input buffer
87         );
88         int items_to_copy = std::min(space_left_in_buffers, d_residual);
89         in = (const char*)input_items[d_stream] + input_index[d_stream] * d_itemsize;
90         memcpy(&out[out_index * d_itemsize], in, items_to_copy * d_itemsize);
91         get_tags_in_window(stream_t,
92                            d_stream,
93                            input_index[d_stream],
94                            input_index[d_stream] + items_to_copy);
95         BOOST_FOREACH (gr::tag_t t, stream_t) {
96             t.offset = t.offset - nitems_read(d_stream) - input_index[d_stream] +
97                        nitems_written(0) + out_index;
98             add_item_tag(0, t);
99         }
100 
101         out_index += items_to_copy;
102         input_index[d_stream] += items_to_copy;
103         d_residual -= items_to_copy;
104         if (d_residual == 0) {
105             do { // Skip all those inputs with zero length
106                 d_stream = (d_stream + 1) % d_lengths.size();
107             } while (d_lengths[d_stream] == 0);
108             d_residual = d_lengths[d_stream];
109         } else {
110             break;
111         }
112     } // while
113 
114     for (size_t i = 0; i < input_index.size(); i++) {
115         consume((int)i, input_index[i]);
116     }
117 
118     return out_index;
119 } /* work */
120 
121 } /* namespace blocks */
122 } /* namespace gr */
123