1 /* -*- c++ -*- */
2 /*
3  * Copyright 2004,2009,2010,2013 Free Software Foundation, Inc.
4  *
5  * This file is part of GNU Radio
6  *
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3, or (at your option)
10  * any later version.
11  *
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with GNU Radio; see the file COPYING.  If not, write to
19  * the Free Software Foundation, Inc., 51 Franklin Street,
20  * Boston, MA 02110-1301, USA.
21  */
22 
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26 
27 #include <gnuradio/block_detail.h>
28 #include <gnuradio/buffer.h>
29 #include <iostream>
30 
31 namespace gr {
32 
33 static long s_ncurrently_allocated = 0;
34 
block_detail_ncurrently_allocated()35 long block_detail_ncurrently_allocated() { return s_ncurrently_allocated; }
36 
block_detail(unsigned int ninputs,unsigned int noutputs)37 block_detail::block_detail(unsigned int ninputs, unsigned int noutputs)
38     : d_produce_or(0),
39       d_ninputs(ninputs),
40       d_noutputs(noutputs),
41       d_input(ninputs),
42       d_output(noutputs),
43       d_done(false),
44       d_ins_noutput_items(0),
45       d_avg_noutput_items(0),
46       d_var_noutput_items(0),
47       d_total_noutput_items(0),
48       d_ins_nproduced(0),
49       d_avg_nproduced(0),
50       d_var_nproduced(0),
51       d_ins_input_buffers_full(ninputs, 0),
52       d_avg_input_buffers_full(ninputs, 0),
53       d_var_input_buffers_full(ninputs, 0),
54       d_ins_output_buffers_full(noutputs, 0),
55       d_avg_output_buffers_full(noutputs, 0),
56       d_var_output_buffers_full(noutputs, 0),
57       d_ins_work_time(0),
58       d_avg_work_time(0),
59       d_var_work_time(0),
60       d_avg_throughput(0),
61       d_pc_counter(0)
62 {
63     s_ncurrently_allocated++;
64     d_pc_start_time = gr::high_res_timer_now();
65 }
66 
~block_detail()67 block_detail::~block_detail()
68 {
69     // should take care of itself
70     s_ncurrently_allocated--;
71 }
72 
set_input(unsigned int which,buffer_reader_sptr reader)73 void block_detail::set_input(unsigned int which, buffer_reader_sptr reader)
74 {
75     if (which >= d_ninputs)
76         throw std::invalid_argument("block_detail::set_input");
77 
78     d_input[which] = reader;
79 }
80 
set_output(unsigned int which,buffer_sptr buffer)81 void block_detail::set_output(unsigned int which, buffer_sptr buffer)
82 {
83     if (which >= d_noutputs)
84         throw std::invalid_argument("block_detail::set_output");
85 
86     d_output[which] = buffer;
87 }
88 
make_block_detail(unsigned int ninputs,unsigned int noutputs)89 block_detail_sptr make_block_detail(unsigned int ninputs, unsigned int noutputs)
90 {
91     return block_detail_sptr(new block_detail(ninputs, noutputs));
92 }
93 
set_done(bool done)94 void block_detail::set_done(bool done)
95 {
96     d_done = done;
97     for (unsigned int i = 0; i < d_noutputs; i++)
98         d_output[i]->set_done(done);
99 
100     for (unsigned int i = 0; i < d_ninputs; i++)
101         d_input[i]->set_done(done);
102 }
103 
consume(int which_input,int how_many_items)104 void block_detail::consume(int which_input, int how_many_items)
105 {
106     d_consumed = how_many_items;
107     if (how_many_items > 0) {
108         input(which_input)->update_read_pointer(how_many_items);
109     }
110 }
111 
consumed() const112 int block_detail::consumed() const { return d_consumed; }
113 
consume_each(int how_many_items)114 void block_detail::consume_each(int how_many_items)
115 {
116     d_consumed = how_many_items;
117     if (how_many_items > 0) {
118         for (int i = 0; i < ninputs(); i++) {
119             d_input[i]->update_read_pointer(how_many_items);
120         }
121     }
122 }
123 
produce(int which_output,int how_many_items)124 void block_detail::produce(int which_output, int how_many_items)
125 {
126     if (how_many_items > 0) {
127         d_output[which_output]->update_write_pointer(how_many_items);
128         d_produce_or |= how_many_items;
129     }
130 }
131 
produce_each(int how_many_items)132 void block_detail::produce_each(int how_many_items)
133 {
134     if (how_many_items > 0) {
135         for (int i = 0; i < noutputs(); i++) {
136             d_output[i]->update_write_pointer(how_many_items);
137         }
138         d_produce_or |= how_many_items;
139     }
140 }
141 
nitems_read(unsigned int which_input)142 uint64_t block_detail::nitems_read(unsigned int which_input)
143 {
144     if (which_input >= d_ninputs)
145         throw std::invalid_argument("block_detail::n_input_items");
146     return d_input[which_input]->nitems_read();
147 }
148 
nitems_written(unsigned int which_output)149 uint64_t block_detail::nitems_written(unsigned int which_output)
150 {
151     if (which_output >= d_noutputs)
152         throw std::invalid_argument("block_detail::n_output_items");
153     return d_output[which_output]->nitems_written();
154 }
155 
reset_nitem_counters()156 void block_detail::reset_nitem_counters()
157 {
158     for (unsigned int i = 0; i < d_ninputs; i++) {
159         d_input[i]->reset_nitem_counter();
160     }
161     for (unsigned int o = 0; o < d_noutputs; o++) {
162         d_output[o]->reset_nitem_counter();
163     }
164 }
165 
clear_tags()166 void block_detail::clear_tags()
167 {
168     for (unsigned int i = 0; i < d_ninputs; i++) {
169         uint64_t max_time = 0xFFFFFFFFFFFFFFFF; // from now to the end of time
170         d_input[i]->buffer()->prune_tags(max_time);
171     }
172 }
173 
add_item_tag(unsigned int which_output,const tag_t & tag)174 void block_detail::add_item_tag(unsigned int which_output, const tag_t& tag)
175 {
176     if (!pmt::is_symbol(tag.key)) {
177         throw pmt::wrong_type("block_detail::add_item_tag key", tag.key);
178     } else {
179         // Add tag to gr_buffer's deque tags
180         d_output[which_output]->add_item_tag(tag);
181     }
182 }
183 
remove_item_tag(unsigned int which_input,const tag_t & tag,long id)184 void block_detail::remove_item_tag(unsigned int which_input, const tag_t& tag, long id)
185 {
186     if (!pmt::is_symbol(tag.key)) {
187         throw pmt::wrong_type("block_detail::add_item_tag key", tag.key);
188     } else {
189         // Add tag to gr_buffer's deque tags
190         d_input[which_input]->buffer()->remove_item_tag(tag, id);
191     }
192 }
193 
get_tags_in_range(std::vector<tag_t> & v,unsigned int which_input,uint64_t abs_start,uint64_t abs_end,long id)194 void block_detail::get_tags_in_range(std::vector<tag_t>& v,
195                                      unsigned int which_input,
196                                      uint64_t abs_start,
197                                      uint64_t abs_end,
198                                      long id)
199 {
200     // get from gr_buffer_reader's deque of tags
201     d_input[which_input]->get_tags_in_range(v, abs_start, abs_end, id);
202 }
203 
get_tags_in_range(std::vector<tag_t> & v,unsigned int which_input,uint64_t abs_start,uint64_t abs_end,const pmt::pmt_t & key,long id)204 void block_detail::get_tags_in_range(std::vector<tag_t>& v,
205                                      unsigned int which_input,
206                                      uint64_t abs_start,
207                                      uint64_t abs_end,
208                                      const pmt::pmt_t& key,
209                                      long id)
210 {
211     std::vector<tag_t> found_items;
212 
213     v.resize(0);
214 
215     // get from gr_buffer_reader's deque of tags
216     d_input[which_input]->get_tags_in_range(found_items, abs_start, abs_end, id);
217 
218     // Filter further by key name
219     pmt::pmt_t itemkey;
220     std::vector<tag_t>::iterator itr;
221     for (itr = found_items.begin(); itr != found_items.end(); itr++) {
222         itemkey = (*itr).key;
223         if (pmt::eqv(key, itemkey)) {
224             v.push_back(*itr);
225         }
226     }
227 }
228 
set_processor_affinity(const std::vector<int> & mask)229 void block_detail::set_processor_affinity(const std::vector<int>& mask)
230 {
231     if (threaded) {
232         try {
233             gr::thread::thread_bind_to_processor(thread, mask);
234         } catch (std::runtime_error& e) {
235             std::cerr << "set_processor_affinity: invalid mask." << std::endl;
236         }
237     }
238 }
239 
unset_processor_affinity()240 void block_detail::unset_processor_affinity()
241 {
242     if (threaded) {
243         gr::thread::thread_unbind(thread);
244     }
245 }
246 
thread_priority()247 int block_detail::thread_priority()
248 {
249     if (threaded) {
250         return gr::thread::thread_priority(thread);
251     }
252     return -1;
253 }
254 
set_thread_priority(int priority)255 int block_detail::set_thread_priority(int priority)
256 {
257     if (threaded) {
258         return gr::thread::set_thread_priority(thread, priority);
259     }
260     return -1;
261 }
262 
start_perf_counters()263 void block_detail::start_perf_counters()
264 {
265     d_start_of_work = gr::high_res_timer_now_perfmon();
266 }
267 
stop_perf_counters(int noutput_items,int nproduced)268 void block_detail::stop_perf_counters(int noutput_items, int nproduced)
269 {
270     d_end_of_work = gr::high_res_timer_now_perfmon();
271     gr::high_res_timer_type diff = d_end_of_work - d_start_of_work;
272 
273     if (d_pc_counter == 0) {
274         d_ins_work_time = diff;
275         d_avg_work_time = diff;
276         d_var_work_time = 0;
277         d_total_work_time = diff;
278         d_ins_nproduced = nproduced;
279         d_avg_nproduced = nproduced;
280         d_var_nproduced = 0;
281         d_ins_noutput_items = noutput_items;
282         d_avg_noutput_items = noutput_items;
283         d_var_noutput_items = 0;
284         d_total_noutput_items = noutput_items;
285         d_pc_start_time = (float)gr::high_res_timer_now();
286         for (size_t i = 0; i < d_input.size(); i++) {
287             buffer_reader_sptr in_buf = d_input[i];
288             gr::thread::scoped_lock guard(*in_buf->mutex());
289             float pfull = static_cast<float>(in_buf->items_available()) /
290                           static_cast<float>(in_buf->max_possible_items_available());
291             d_ins_input_buffers_full[i] = pfull;
292             d_avg_input_buffers_full[i] = pfull;
293             d_var_input_buffers_full[i] = 0;
294         }
295         for (size_t i = 0; i < d_output.size(); i++) {
296             buffer_sptr out_buf = d_output[i];
297             gr::thread::scoped_lock guard(*out_buf->mutex());
298             float pfull = 1.0f - static_cast<float>(out_buf->space_available()) /
299                                      static_cast<float>(out_buf->bufsize());
300             d_ins_output_buffers_full[i] = pfull;
301             d_avg_output_buffers_full[i] = pfull;
302             d_var_output_buffers_full[i] = 0;
303         }
304     } else {
305         float d = diff - d_avg_work_time;
306         d_ins_work_time = diff;
307         d_avg_work_time = d_avg_work_time + d / d_pc_counter;
308         d_var_work_time = d_var_work_time + d * d;
309         d_total_work_time += diff;
310 
311         d = nproduced - d_avg_nproduced;
312         d_ins_nproduced = nproduced;
313         d_avg_nproduced = d_avg_nproduced + d / d_pc_counter;
314         d_var_nproduced = d_var_nproduced + d * d;
315 
316         d = noutput_items - d_avg_noutput_items;
317         d_ins_noutput_items = noutput_items;
318         d_avg_noutput_items = d_avg_noutput_items + d / d_pc_counter;
319         d_var_noutput_items = d_var_noutput_items + d * d;
320         d_total_noutput_items += noutput_items;
321         d_pc_last_work_time = gr::high_res_timer_now();
322         float monitor_time = (float)(d_pc_last_work_time - d_pc_start_time) /
323                              (float)gr::high_res_timer_tps();
324         d_avg_throughput = d_total_noutput_items / monitor_time;
325 
326         for (size_t i = 0; i < d_input.size(); i++) {
327             buffer_reader_sptr in_buf = d_input[i];
328             gr::thread::scoped_lock guard(*in_buf->mutex());
329             float pfull = static_cast<float>(in_buf->items_available()) /
330                           static_cast<float>(in_buf->max_possible_items_available());
331 
332             d = pfull - d_avg_input_buffers_full[i];
333             d_ins_input_buffers_full[i] = pfull;
334             d_avg_input_buffers_full[i] = d_avg_input_buffers_full[i] + d / d_pc_counter;
335             d_var_input_buffers_full[i] = d_var_input_buffers_full[i] + d * d;
336         }
337 
338         for (size_t i = 0; i < d_output.size(); i++) {
339             buffer_sptr out_buf = d_output[i];
340             gr::thread::scoped_lock guard(*out_buf->mutex());
341             float pfull = 1.0f - static_cast<float>(out_buf->space_available()) /
342                                      static_cast<float>(out_buf->bufsize());
343 
344             d = pfull - d_avg_output_buffers_full[i];
345             d_ins_output_buffers_full[i] = pfull;
346             d_avg_output_buffers_full[i] =
347                 d_avg_output_buffers_full[i] + d / d_pc_counter;
348             d_var_output_buffers_full[i] = d_var_output_buffers_full[i] + d * d;
349         }
350     }
351 
352     d_pc_counter++;
353 }
354 
reset_perf_counters()355 void block_detail::reset_perf_counters() { d_pc_counter = 0; }
356 
pc_noutput_items()357 float block_detail::pc_noutput_items() { return d_ins_noutput_items; }
358 
pc_nproduced()359 float block_detail::pc_nproduced() { return d_ins_nproduced; }
360 
pc_input_buffers_full(size_t which)361 float block_detail::pc_input_buffers_full(size_t which)
362 {
363     if (which < d_ins_input_buffers_full.size())
364         return d_ins_input_buffers_full[which];
365     else
366         return 0;
367 }
368 
pc_input_buffers_full()369 std::vector<float> block_detail::pc_input_buffers_full()
370 {
371     return d_ins_input_buffers_full;
372 }
373 
pc_output_buffers_full(size_t which)374 float block_detail::pc_output_buffers_full(size_t which)
375 {
376     if (which < d_ins_output_buffers_full.size())
377         return d_ins_output_buffers_full[which];
378     else
379         return 0;
380 }
381 
pc_output_buffers_full()382 std::vector<float> block_detail::pc_output_buffers_full()
383 {
384     return d_ins_output_buffers_full;
385 }
386 
pc_work_time()387 float block_detail::pc_work_time() { return d_ins_work_time; }
388 
pc_noutput_items_avg()389 float block_detail::pc_noutput_items_avg() { return d_avg_noutput_items; }
390 
pc_nproduced_avg()391 float block_detail::pc_nproduced_avg() { return d_avg_nproduced; }
392 
pc_input_buffers_full_avg(size_t which)393 float block_detail::pc_input_buffers_full_avg(size_t which)
394 {
395     if (which < d_avg_input_buffers_full.size())
396         return d_avg_input_buffers_full[which];
397     else
398         return 0;
399 }
400 
pc_input_buffers_full_avg()401 std::vector<float> block_detail::pc_input_buffers_full_avg()
402 {
403     return d_avg_input_buffers_full;
404 }
405 
pc_output_buffers_full_avg(size_t which)406 float block_detail::pc_output_buffers_full_avg(size_t which)
407 {
408     if (which < d_avg_output_buffers_full.size())
409         return d_avg_output_buffers_full[which];
410     else
411         return 0;
412 }
413 
pc_output_buffers_full_avg()414 std::vector<float> block_detail::pc_output_buffers_full_avg()
415 {
416     return d_avg_output_buffers_full;
417 }
418 
pc_work_time_avg()419 float block_detail::pc_work_time_avg() { return d_avg_work_time; }
420 
pc_noutput_items_var()421 float block_detail::pc_noutput_items_var()
422 {
423     return d_var_noutput_items / (d_pc_counter - 1);
424 }
425 
pc_nproduced_var()426 float block_detail::pc_nproduced_var() { return d_var_nproduced / (d_pc_counter - 1); }
427 
pc_input_buffers_full_var(size_t which)428 float block_detail::pc_input_buffers_full_var(size_t which)
429 {
430     if (which < d_avg_input_buffers_full.size())
431         return d_var_input_buffers_full[which] / (d_pc_counter - 1);
432     else
433         return 0;
434 }
435 
pc_input_buffers_full_var()436 std::vector<float> block_detail::pc_input_buffers_full_var()
437 {
438     std::vector<float> var(d_avg_input_buffers_full.size(), 0);
439     for (size_t i = 0; i < d_avg_input_buffers_full.size(); i++)
440         var[i] = d_avg_input_buffers_full[i] / (d_pc_counter - 1);
441     return var;
442 }
443 
pc_output_buffers_full_var(size_t which)444 float block_detail::pc_output_buffers_full_var(size_t which)
445 {
446     if (which < d_avg_output_buffers_full.size())
447         return d_var_output_buffers_full[which] / (d_pc_counter - 1);
448     else
449         return 0;
450 }
451 
pc_output_buffers_full_var()452 std::vector<float> block_detail::pc_output_buffers_full_var()
453 {
454     std::vector<float> var(d_avg_output_buffers_full.size(), 0);
455     for (size_t i = 0; i < d_avg_output_buffers_full.size(); i++)
456         var[i] = d_avg_output_buffers_full[i] / (d_pc_counter - 1);
457     return var;
458 }
459 
pc_work_time_var()460 float block_detail::pc_work_time_var() { return d_var_work_time / (d_pc_counter - 1); }
461 
pc_work_time_total()462 float block_detail::pc_work_time_total() { return d_total_work_time; }
463 
pc_throughput_avg()464 float block_detail::pc_throughput_avg() { return d_avg_throughput; }
465 } /* namespace gr */
466