1 /* -*- c++ -*- */
2 /*
3  * Copyright 2004,2009,2010,2013 Free Software Foundation, Inc.
4  *
5  * This file is part of GNU Radio
6  *
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3, or (at your option)
10  * any later version.
11  *
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more detail.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with GNU Radio; see the file COPYING.  If not, write to
19  * the Free Software Foundation, Inc., 51 Franklin Street,
20  * Boston, MA 02110-1301, USA.
21  */
22 
23 #ifndef INCLUDED_GR_RUNTIME_BLOCK_DETAIL_H
24 #define INCLUDED_GR_RUNTIME_BLOCK_DETAIL_H
25 
26 #include <gnuradio/api.h>
27 #include <gnuradio/high_res_timer.h>
28 #include <gnuradio/runtime_types.h>
29 #include <gnuradio/tags.h>
30 #include <gnuradio/tpb_detail.h>
31 #include <stdexcept>
32 
33 namespace gr {
34 
35 /*!
36  * \brief Implementation details to support the signal processing abstraction
37  * \ingroup internal
38  *
39  * This class contains implementation detail that should be "out of
40  * sight" of almost all users of GNU Radio.  This decoupling also
41  * means that we can make changes to the guts without having to
42  * recompile everything.
43  */
44 class GR_RUNTIME_API block_detail
45 {
46 public:
47     ~block_detail();
48 
ninputs()49     int ninputs() const { return d_ninputs; }
noutputs()50     int noutputs() const { return d_noutputs; }
sink_p()51     bool sink_p() const { return d_noutputs == 0; }
source_p()52     bool source_p() const { return d_ninputs == 0; }
53 
54     void set_done(bool done);
done()55     bool done() const { return d_done; }
56 
57     void set_input(unsigned int which, buffer_reader_sptr reader);
input(unsigned int which)58     buffer_reader_sptr input(unsigned int which)
59     {
60         if (which >= d_ninputs)
61             throw std::invalid_argument("block_detail::input");
62         return d_input[which];
63     }
64 
65     void set_output(unsigned int which, buffer_sptr buffer);
output(unsigned int which)66     buffer_sptr output(unsigned int which)
67     {
68         if (which >= d_noutputs)
69             throw std::invalid_argument("block_detail::output");
70         return d_output[which];
71     }
72 
73     /*!
74      * \brief Tell the scheduler \p how_many_items of input stream \p
75      * which_input were consumed.
76      */
77     void consume(int which_input, int how_many_items);
78 
79     /*!
80      * \brief Tell the scheduler \p how_many_items were consumed on
81      * each input stream.
82      */
83     void consume_each(int how_many_items);
84 
85     /*!
86      * \brief Tell the scheduler \p how_many_items were produced on
87      * output stream \p which_output.
88      */
89     void produce(int which_output, int how_many_items);
90 
91     /*!
92      * \brief Tell the scheduler \p how_many_items were produced on
93      * each output stream.
94      */
95     void produce_each(int how_many_items);
96 
97     // Return the number of items read on input stream which_input
98     uint64_t nitems_read(unsigned int which_input);
99 
100     // Return the number of items written on output stream which_output
101     uint64_t nitems_written(unsigned int which_output);
102 
103     // sets nitems_read and nitems_written to 0 for all input/output
104     // buffers.
105     void reset_nitem_counters();
106 
107     // Clears all tags from the input buffers.
108     void clear_tags();
109 
110     /*!
111      * \brief  Adds a new tag to the given output stream.
112      *
113      * Calls gr::buffer::add_item_tag(),
114      * which appends the tag onto its deque.
115      *
116      * \param which_output  an integer of which output stream to attach the tag
117      * \param tag the tag object to add
118      */
119     void add_item_tag(unsigned int which_output, const tag_t& tag);
120 
121     /*!
122      * \brief  Removes a tag from the given input stream.
123      *
124      * Calls gr::buffer::remove_item_tag().
125      * The tag in question will then no longer appear on subsequent calls of
126      * get_tags_in_range().
127      *
128      * \param which_input  an integer of which input stream to remove the tag from
129      * \param tag the tag object to add
130      * \param id The unique block ID (use gr::block::unique_id())
131      */
132     void remove_item_tag(unsigned int which_input, const tag_t& tag, long id);
133 
134     /*!
135      * \brief Given a [start,end), returns a vector of all tags in the range.
136      *
137      * Pass-through function to gr::buffer_reader to get a vector of
138      * tags in given range. Range of counts is from start to end-1.
139      *
140      * Tags are tuples of:
141      *      (item count, source id, key, value)
142      *
143      * \param v            a vector reference to return tags into
144      * \param which_input  an integer of which input stream to pull from
145      * \param abs_start    a uint64 count of the start of the range of interest
146      * \param abs_end      a uint64 count of the end of the range of interest
147      * \param id           Block ID
148      */
149     void get_tags_in_range(std::vector<tag_t>& v,
150                            unsigned int which_input,
151                            uint64_t abs_start,
152                            uint64_t abs_end,
153                            long id);
154 
155     /*!
156      * \brief Given a [start,end), returns a vector of all tags in the
157      * range with a given key.
158      *
159      * Calls get_tags_in_range(which_input, abs_start, abs_end) to get
160      * a vector of tags from the buffers. This function then provides
161      * a secondary filter to the tags to extract only tags with the
162      * given 'key'.
163      *
164      * Tags are tuples of:
165      *      (item count, source id, key, value)
166      *
167      * \param v            a vector reference to return tags into
168      * \param which_input  an integer of which input stream to pull from
169      * \param abs_start    a uint64 count of the start of the range of interest
170      * \param abs_end      a uint64 count of the end of the range of interest
171      * \param key          a PMT symbol to select only tags of this key
172      * \param id           Block ID
173      */
174     void get_tags_in_range(std::vector<tag_t>& v,
175                            unsigned int which_input,
176                            uint64_t abs_start,
177                            uint64_t abs_end,
178                            const pmt::pmt_t& key,
179                            long id);
180 
181     /*!
182      * \brief Set core affinity of block to the cores in the vector
183      * mask.
184      *
185      * \param mask a vector of ints of the core numbers available to
186      * this block.
187      */
188     void set_processor_affinity(const std::vector<int>& mask);
189 
190     /*!
191      * \brief Unset core affinity.
192      */
193     void unset_processor_affinity();
194 
195     /*!
196      * \brief Get the current thread priority
197      */
198     int thread_priority();
199 
200     /*!
201      * \brief Set the current thread priority
202      *
203      * \param priority the new thread priority to set
204      */
205     int set_thread_priority(int priority);
206 
207     bool threaded;                  // set if thread is currently running.
208     gr::thread::gr_thread_t thread; // portable thread handle
209 
210     void start_perf_counters();
211     void stop_perf_counters(int noutput_items, int nproduced);
212     void reset_perf_counters();
213 
214     // Calls to get performance counter items
215     float pc_noutput_items();
216     float pc_nproduced();
217     float pc_input_buffers_full(size_t which);
218     std::vector<float> pc_input_buffers_full();
219     float pc_output_buffers_full(size_t which);
220     std::vector<float> pc_output_buffers_full();
221     float pc_work_time();
222 
223     float pc_noutput_items_avg();
224     float pc_nproduced_avg();
225     float pc_input_buffers_full_avg(size_t which);
226     std::vector<float> pc_input_buffers_full_avg();
227     float pc_output_buffers_full_avg(size_t which);
228     std::vector<float> pc_output_buffers_full_avg();
229     float pc_work_time_avg();
230     float pc_throughput_avg();
231 
232     float pc_noutput_items_var();
233     float pc_nproduced_var();
234     float pc_input_buffers_full_var(size_t which);
235     std::vector<float> pc_input_buffers_full_var();
236     float pc_output_buffers_full_var(size_t which);
237     std::vector<float> pc_output_buffers_full_var();
238     float pc_work_time_var();
239 
240     float pc_work_time_total();
241 
242     tpb_detail d_tpb; // used by thread-per-block scheduler
243     int d_produce_or;
244 
245     int consumed() const;
246 
247     // ----------------------------------------------------------------------------
248 
249 private:
250     unsigned int d_ninputs;
251     unsigned int d_noutputs;
252     std::vector<buffer_reader_sptr> d_input;
253     std::vector<buffer_sptr> d_output;
254     bool d_done;
255     int d_consumed;
256 
257     // Performance counters
258     float d_ins_noutput_items;
259     float d_avg_noutput_items;
260     float d_var_noutput_items;
261     float d_total_noutput_items;
262     gr::high_res_timer_type d_pc_start_time;
263     gr::high_res_timer_type d_pc_last_work_time;
264     float d_ins_nproduced;
265     float d_avg_nproduced;
266     float d_var_nproduced;
267     std::vector<float> d_ins_input_buffers_full;
268     std::vector<float> d_avg_input_buffers_full;
269     std::vector<float> d_var_input_buffers_full;
270     std::vector<float> d_ins_output_buffers_full;
271     std::vector<float> d_avg_output_buffers_full;
272     std::vector<float> d_var_output_buffers_full;
273     gr::high_res_timer_type d_start_of_work, d_end_of_work;
274     float d_ins_work_time;
275     float d_avg_work_time;
276     float d_var_work_time;
277     float d_total_work_time;
278     float d_avg_throughput;
279     float d_pc_counter;
280 
281     block_detail(unsigned int ninputs, unsigned int noutputs);
282 
283     friend struct tpb_detail;
284 
285     friend GR_RUNTIME_API block_detail_sptr make_block_detail(unsigned int ninputs,
286                                                               unsigned int noutputs);
287 };
288 
289 GR_RUNTIME_API block_detail_sptr make_block_detail(unsigned int ninputs,
290                                                    unsigned int noutputs);
291 
292 GR_RUNTIME_API long block_detail_ncurrently_allocated();
293 
294 } /* namespace gr */
295 
296 #endif /* INCLUDED_GR_RUNTIME_BLOCK_DETAIL_H */
297