1 /* -*- c++ -*- */
2 /*
3 * Copyright 2004,2009,2010,2013 Free Software Foundation, Inc.
4 *
5 * This file is part of GNU Radio
6 *
7 * GNU Radio is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 3, or (at your option)
10 * any later version.
11 *
12 * GNU Radio is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with GNU Radio; see the file COPYING. If not, write to
19 * the Free Software Foundation, Inc., 51 Franklin Street,
20 * Boston, MA 02110-1301, USA.
21 */
22
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26
27 #include <gnuradio/block_detail.h>
28 #include <gnuradio/buffer.h>
29 #include <iostream>
30
31 namespace gr {
32
33 static long s_ncurrently_allocated = 0;
34
block_detail_ncurrently_allocated()35 long block_detail_ncurrently_allocated() { return s_ncurrently_allocated; }
36
block_detail(unsigned int ninputs,unsigned int noutputs)37 block_detail::block_detail(unsigned int ninputs, unsigned int noutputs)
38 : d_produce_or(0),
39 d_ninputs(ninputs),
40 d_noutputs(noutputs),
41 d_input(ninputs),
42 d_output(noutputs),
43 d_done(false),
44 d_ins_noutput_items(0),
45 d_avg_noutput_items(0),
46 d_var_noutput_items(0),
47 d_total_noutput_items(0),
48 d_ins_nproduced(0),
49 d_avg_nproduced(0),
50 d_var_nproduced(0),
51 d_ins_input_buffers_full(ninputs, 0),
52 d_avg_input_buffers_full(ninputs, 0),
53 d_var_input_buffers_full(ninputs, 0),
54 d_ins_output_buffers_full(noutputs, 0),
55 d_avg_output_buffers_full(noutputs, 0),
56 d_var_output_buffers_full(noutputs, 0),
57 d_ins_work_time(0),
58 d_avg_work_time(0),
59 d_var_work_time(0),
60 d_avg_throughput(0),
61 d_pc_counter(0)
62 {
63 s_ncurrently_allocated++;
64 d_pc_start_time = gr::high_res_timer_now();
65 }
66
~block_detail()67 block_detail::~block_detail()
68 {
69 // should take care of itself
70 s_ncurrently_allocated--;
71 }
72
set_input(unsigned int which,buffer_reader_sptr reader)73 void block_detail::set_input(unsigned int which, buffer_reader_sptr reader)
74 {
75 if (which >= d_ninputs)
76 throw std::invalid_argument("block_detail::set_input");
77
78 d_input[which] = reader;
79 }
80
set_output(unsigned int which,buffer_sptr buffer)81 void block_detail::set_output(unsigned int which, buffer_sptr buffer)
82 {
83 if (which >= d_noutputs)
84 throw std::invalid_argument("block_detail::set_output");
85
86 d_output[which] = buffer;
87 }
88
make_block_detail(unsigned int ninputs,unsigned int noutputs)89 block_detail_sptr make_block_detail(unsigned int ninputs, unsigned int noutputs)
90 {
91 return block_detail_sptr(new block_detail(ninputs, noutputs));
92 }
93
set_done(bool done)94 void block_detail::set_done(bool done)
95 {
96 d_done = done;
97 for (unsigned int i = 0; i < d_noutputs; i++)
98 d_output[i]->set_done(done);
99
100 for (unsigned int i = 0; i < d_ninputs; i++)
101 d_input[i]->set_done(done);
102 }
103
consume(int which_input,int how_many_items)104 void block_detail::consume(int which_input, int how_many_items)
105 {
106 d_consumed = how_many_items;
107 if (how_many_items > 0) {
108 input(which_input)->update_read_pointer(how_many_items);
109 }
110 }
111
consumed() const112 int block_detail::consumed() const { return d_consumed; }
113
consume_each(int how_many_items)114 void block_detail::consume_each(int how_many_items)
115 {
116 d_consumed = how_many_items;
117 if (how_many_items > 0) {
118 for (int i = 0; i < ninputs(); i++) {
119 d_input[i]->update_read_pointer(how_many_items);
120 }
121 }
122 }
123
produce(int which_output,int how_many_items)124 void block_detail::produce(int which_output, int how_many_items)
125 {
126 if (how_many_items > 0) {
127 d_output[which_output]->update_write_pointer(how_many_items);
128 d_produce_or |= how_many_items;
129 }
130 }
131
produce_each(int how_many_items)132 void block_detail::produce_each(int how_many_items)
133 {
134 if (how_many_items > 0) {
135 for (int i = 0; i < noutputs(); i++) {
136 d_output[i]->update_write_pointer(how_many_items);
137 }
138 d_produce_or |= how_many_items;
139 }
140 }
141
nitems_read(unsigned int which_input)142 uint64_t block_detail::nitems_read(unsigned int which_input)
143 {
144 if (which_input >= d_ninputs)
145 throw std::invalid_argument("block_detail::n_input_items");
146 return d_input[which_input]->nitems_read();
147 }
148
nitems_written(unsigned int which_output)149 uint64_t block_detail::nitems_written(unsigned int which_output)
150 {
151 if (which_output >= d_noutputs)
152 throw std::invalid_argument("block_detail::n_output_items");
153 return d_output[which_output]->nitems_written();
154 }
155
reset_nitem_counters()156 void block_detail::reset_nitem_counters()
157 {
158 for (unsigned int i = 0; i < d_ninputs; i++) {
159 d_input[i]->reset_nitem_counter();
160 }
161 for (unsigned int o = 0; o < d_noutputs; o++) {
162 d_output[o]->reset_nitem_counter();
163 }
164 }
165
clear_tags()166 void block_detail::clear_tags()
167 {
168 for (unsigned int i = 0; i < d_ninputs; i++) {
169 uint64_t max_time = 0xFFFFFFFFFFFFFFFF; // from now to the end of time
170 d_input[i]->buffer()->prune_tags(max_time);
171 }
172 }
173
add_item_tag(unsigned int which_output,const tag_t & tag)174 void block_detail::add_item_tag(unsigned int which_output, const tag_t& tag)
175 {
176 if (!pmt::is_symbol(tag.key)) {
177 throw pmt::wrong_type("block_detail::add_item_tag key", tag.key);
178 } else {
179 // Add tag to gr_buffer's deque tags
180 d_output[which_output]->add_item_tag(tag);
181 }
182 }
183
remove_item_tag(unsigned int which_input,const tag_t & tag,long id)184 void block_detail::remove_item_tag(unsigned int which_input, const tag_t& tag, long id)
185 {
186 if (!pmt::is_symbol(tag.key)) {
187 throw pmt::wrong_type("block_detail::add_item_tag key", tag.key);
188 } else {
189 // Add tag to gr_buffer's deque tags
190 d_input[which_input]->buffer()->remove_item_tag(tag, id);
191 }
192 }
193
get_tags_in_range(std::vector<tag_t> & v,unsigned int which_input,uint64_t abs_start,uint64_t abs_end,long id)194 void block_detail::get_tags_in_range(std::vector<tag_t>& v,
195 unsigned int which_input,
196 uint64_t abs_start,
197 uint64_t abs_end,
198 long id)
199 {
200 // get from gr_buffer_reader's deque of tags
201 d_input[which_input]->get_tags_in_range(v, abs_start, abs_end, id);
202 }
203
get_tags_in_range(std::vector<tag_t> & v,unsigned int which_input,uint64_t abs_start,uint64_t abs_end,const pmt::pmt_t & key,long id)204 void block_detail::get_tags_in_range(std::vector<tag_t>& v,
205 unsigned int which_input,
206 uint64_t abs_start,
207 uint64_t abs_end,
208 const pmt::pmt_t& key,
209 long id)
210 {
211 std::vector<tag_t> found_items;
212
213 v.resize(0);
214
215 // get from gr_buffer_reader's deque of tags
216 d_input[which_input]->get_tags_in_range(found_items, abs_start, abs_end, id);
217
218 // Filter further by key name
219 pmt::pmt_t itemkey;
220 std::vector<tag_t>::iterator itr;
221 for (itr = found_items.begin(); itr != found_items.end(); itr++) {
222 itemkey = (*itr).key;
223 if (pmt::eqv(key, itemkey)) {
224 v.push_back(*itr);
225 }
226 }
227 }
228
set_processor_affinity(const std::vector<int> & mask)229 void block_detail::set_processor_affinity(const std::vector<int>& mask)
230 {
231 if (threaded) {
232 try {
233 gr::thread::thread_bind_to_processor(thread, mask);
234 } catch (std::runtime_error& e) {
235 std::cerr << "set_processor_affinity: invalid mask." << std::endl;
236 }
237 }
238 }
239
unset_processor_affinity()240 void block_detail::unset_processor_affinity()
241 {
242 if (threaded) {
243 gr::thread::thread_unbind(thread);
244 }
245 }
246
thread_priority()247 int block_detail::thread_priority()
248 {
249 if (threaded) {
250 return gr::thread::thread_priority(thread);
251 }
252 return -1;
253 }
254
set_thread_priority(int priority)255 int block_detail::set_thread_priority(int priority)
256 {
257 if (threaded) {
258 return gr::thread::set_thread_priority(thread, priority);
259 }
260 return -1;
261 }
262
start_perf_counters()263 void block_detail::start_perf_counters()
264 {
265 d_start_of_work = gr::high_res_timer_now_perfmon();
266 }
267
stop_perf_counters(int noutput_items,int nproduced)268 void block_detail::stop_perf_counters(int noutput_items, int nproduced)
269 {
270 d_end_of_work = gr::high_res_timer_now_perfmon();
271 gr::high_res_timer_type diff = d_end_of_work - d_start_of_work;
272
273 if (d_pc_counter == 0) {
274 d_ins_work_time = diff;
275 d_avg_work_time = diff;
276 d_var_work_time = 0;
277 d_total_work_time = diff;
278 d_ins_nproduced = nproduced;
279 d_avg_nproduced = nproduced;
280 d_var_nproduced = 0;
281 d_ins_noutput_items = noutput_items;
282 d_avg_noutput_items = noutput_items;
283 d_var_noutput_items = 0;
284 d_total_noutput_items = noutput_items;
285 d_pc_start_time = (float)gr::high_res_timer_now();
286 for (size_t i = 0; i < d_input.size(); i++) {
287 buffer_reader_sptr in_buf = d_input[i];
288 gr::thread::scoped_lock guard(*in_buf->mutex());
289 float pfull = static_cast<float>(in_buf->items_available()) /
290 static_cast<float>(in_buf->max_possible_items_available());
291 d_ins_input_buffers_full[i] = pfull;
292 d_avg_input_buffers_full[i] = pfull;
293 d_var_input_buffers_full[i] = 0;
294 }
295 for (size_t i = 0; i < d_output.size(); i++) {
296 buffer_sptr out_buf = d_output[i];
297 gr::thread::scoped_lock guard(*out_buf->mutex());
298 float pfull = 1.0f - static_cast<float>(out_buf->space_available()) /
299 static_cast<float>(out_buf->bufsize());
300 d_ins_output_buffers_full[i] = pfull;
301 d_avg_output_buffers_full[i] = pfull;
302 d_var_output_buffers_full[i] = 0;
303 }
304 } else {
305 float d = diff - d_avg_work_time;
306 d_ins_work_time = diff;
307 d_avg_work_time = d_avg_work_time + d / d_pc_counter;
308 d_var_work_time = d_var_work_time + d * d;
309 d_total_work_time += diff;
310
311 d = nproduced - d_avg_nproduced;
312 d_ins_nproduced = nproduced;
313 d_avg_nproduced = d_avg_nproduced + d / d_pc_counter;
314 d_var_nproduced = d_var_nproduced + d * d;
315
316 d = noutput_items - d_avg_noutput_items;
317 d_ins_noutput_items = noutput_items;
318 d_avg_noutput_items = d_avg_noutput_items + d / d_pc_counter;
319 d_var_noutput_items = d_var_noutput_items + d * d;
320 d_total_noutput_items += noutput_items;
321 d_pc_last_work_time = gr::high_res_timer_now();
322 float monitor_time = (float)(d_pc_last_work_time - d_pc_start_time) /
323 (float)gr::high_res_timer_tps();
324 d_avg_throughput = d_total_noutput_items / monitor_time;
325
326 for (size_t i = 0; i < d_input.size(); i++) {
327 buffer_reader_sptr in_buf = d_input[i];
328 gr::thread::scoped_lock guard(*in_buf->mutex());
329 float pfull = static_cast<float>(in_buf->items_available()) /
330 static_cast<float>(in_buf->max_possible_items_available());
331
332 d = pfull - d_avg_input_buffers_full[i];
333 d_ins_input_buffers_full[i] = pfull;
334 d_avg_input_buffers_full[i] = d_avg_input_buffers_full[i] + d / d_pc_counter;
335 d_var_input_buffers_full[i] = d_var_input_buffers_full[i] + d * d;
336 }
337
338 for (size_t i = 0; i < d_output.size(); i++) {
339 buffer_sptr out_buf = d_output[i];
340 gr::thread::scoped_lock guard(*out_buf->mutex());
341 float pfull = 1.0f - static_cast<float>(out_buf->space_available()) /
342 static_cast<float>(out_buf->bufsize());
343
344 d = pfull - d_avg_output_buffers_full[i];
345 d_ins_output_buffers_full[i] = pfull;
346 d_avg_output_buffers_full[i] =
347 d_avg_output_buffers_full[i] + d / d_pc_counter;
348 d_var_output_buffers_full[i] = d_var_output_buffers_full[i] + d * d;
349 }
350 }
351
352 d_pc_counter++;
353 }
354
reset_perf_counters()355 void block_detail::reset_perf_counters() { d_pc_counter = 0; }
356
pc_noutput_items()357 float block_detail::pc_noutput_items() { return d_ins_noutput_items; }
358
pc_nproduced()359 float block_detail::pc_nproduced() { return d_ins_nproduced; }
360
pc_input_buffers_full(size_t which)361 float block_detail::pc_input_buffers_full(size_t which)
362 {
363 if (which < d_ins_input_buffers_full.size())
364 return d_ins_input_buffers_full[which];
365 else
366 return 0;
367 }
368
pc_input_buffers_full()369 std::vector<float> block_detail::pc_input_buffers_full()
370 {
371 return d_ins_input_buffers_full;
372 }
373
pc_output_buffers_full(size_t which)374 float block_detail::pc_output_buffers_full(size_t which)
375 {
376 if (which < d_ins_output_buffers_full.size())
377 return d_ins_output_buffers_full[which];
378 else
379 return 0;
380 }
381
pc_output_buffers_full()382 std::vector<float> block_detail::pc_output_buffers_full()
383 {
384 return d_ins_output_buffers_full;
385 }
386
pc_work_time()387 float block_detail::pc_work_time() { return d_ins_work_time; }
388
pc_noutput_items_avg()389 float block_detail::pc_noutput_items_avg() { return d_avg_noutput_items; }
390
pc_nproduced_avg()391 float block_detail::pc_nproduced_avg() { return d_avg_nproduced; }
392
pc_input_buffers_full_avg(size_t which)393 float block_detail::pc_input_buffers_full_avg(size_t which)
394 {
395 if (which < d_avg_input_buffers_full.size())
396 return d_avg_input_buffers_full[which];
397 else
398 return 0;
399 }
400
pc_input_buffers_full_avg()401 std::vector<float> block_detail::pc_input_buffers_full_avg()
402 {
403 return d_avg_input_buffers_full;
404 }
405
pc_output_buffers_full_avg(size_t which)406 float block_detail::pc_output_buffers_full_avg(size_t which)
407 {
408 if (which < d_avg_output_buffers_full.size())
409 return d_avg_output_buffers_full[which];
410 else
411 return 0;
412 }
413
pc_output_buffers_full_avg()414 std::vector<float> block_detail::pc_output_buffers_full_avg()
415 {
416 return d_avg_output_buffers_full;
417 }
418
pc_work_time_avg()419 float block_detail::pc_work_time_avg() { return d_avg_work_time; }
420
pc_noutput_items_var()421 float block_detail::pc_noutput_items_var()
422 {
423 return d_var_noutput_items / (d_pc_counter - 1);
424 }
425
pc_nproduced_var()426 float block_detail::pc_nproduced_var() { return d_var_nproduced / (d_pc_counter - 1); }
427
pc_input_buffers_full_var(size_t which)428 float block_detail::pc_input_buffers_full_var(size_t which)
429 {
430 if (which < d_avg_input_buffers_full.size())
431 return d_var_input_buffers_full[which] / (d_pc_counter - 1);
432 else
433 return 0;
434 }
435
pc_input_buffers_full_var()436 std::vector<float> block_detail::pc_input_buffers_full_var()
437 {
438 std::vector<float> var(d_avg_input_buffers_full.size(), 0);
439 for (size_t i = 0; i < d_avg_input_buffers_full.size(); i++)
440 var[i] = d_avg_input_buffers_full[i] / (d_pc_counter - 1);
441 return var;
442 }
443
pc_output_buffers_full_var(size_t which)444 float block_detail::pc_output_buffers_full_var(size_t which)
445 {
446 if (which < d_avg_output_buffers_full.size())
447 return d_var_output_buffers_full[which] / (d_pc_counter - 1);
448 else
449 return 0;
450 }
451
pc_output_buffers_full_var()452 std::vector<float> block_detail::pc_output_buffers_full_var()
453 {
454 std::vector<float> var(d_avg_output_buffers_full.size(), 0);
455 for (size_t i = 0; i < d_avg_output_buffers_full.size(); i++)
456 var[i] = d_avg_output_buffers_full[i] / (d_pc_counter - 1);
457 return var;
458 }
459
pc_work_time_var()460 float block_detail::pc_work_time_var() { return d_var_work_time / (d_pc_counter - 1); }
461
pc_work_time_total()462 float block_detail::pc_work_time_total() { return d_total_work_time; }
463
pc_throughput_avg()464 float block_detail::pc_throughput_avg() { return d_avg_throughput; }
465 } /* namespace gr */
466