1 /* -*- c++ -*- */
2 /*
3  * Copyright 2004,2008-2010,2013,2017 Free Software Foundation, Inc.
4  *
5  * This file is part of GNU Radio
6  *
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3, or (at your option)
10  * any later version.
11  *
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with GNU Radio; see the file COPYING.  If not, write to
19  * the Free Software Foundation, Inc., 51 Franklin Street,
20  * Boston, MA 02110-1301, USA.
21  */
22 
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26 
27 #include <gnuradio/block.h>
28 #include <gnuradio/block_detail.h>
29 #include <gnuradio/buffer.h>
30 #include <gnuradio/prefs.h>
31 #include <assert.h>
32 #include <block_executor.h>
33 #include <stdio.h>
34 #include <boost/format.hpp>
35 #include <boost/thread.hpp>
36 #include <iostream>
37 #include <limits>
38 
39 namespace gr {
40 
41 // must be defined to either 0 or 1
42 #define ENABLE_LOGGING 0
43 
44 #if (ENABLE_LOGGING)
45 #define LOG(x) \
46     do {       \
47         x;     \
48     } while (0)
49 #else
50 #define LOG(x) \
51     do {       \
52         ;      \
53     } while (0)
54 #endif
55 
56 static int which_scheduler = 0;
57 
round_up(unsigned int n,unsigned int multiple)58 inline static unsigned int round_up(unsigned int n, unsigned int multiple)
59 {
60     return ((n + multiple - 1) / multiple) * multiple;
61 }
62 
round_down(unsigned int n,unsigned int multiple)63 inline static unsigned int round_down(unsigned int n, unsigned int multiple)
64 {
65     return (n / multiple) * multiple;
66 }
67 
68 //
69 // Return minimum available write space in all our downstream
70 // buffers or -1 if we're output blocked and the output we're
71 // blocked on is done.
72 //
73 static int
min_available_space(block_detail * d,int output_multiple,int min_noutput_items)74 min_available_space(block_detail* d, int output_multiple, int min_noutput_items)
75 {
76     int min_space = std::numeric_limits<int>::max();
77     if (min_noutput_items == 0)
78         min_noutput_items = 1;
79     for (int i = 0; i < d->noutputs(); i++) {
80         buffer_sptr out_buf = d->output(i);
81         gr::thread::scoped_lock guard(*out_buf->mutex());
82         int avail_n = round_down(out_buf->space_available(), output_multiple);
83         int best_n = round_down(out_buf->bufsize() / 2, output_multiple);
84         if (best_n < min_noutput_items)
85             throw std::runtime_error("Buffer too small for min_noutput_items");
86         int n = std::min(avail_n, best_n);
87         if (n < min_noutput_items) { // We're blocked on output.
88             if (out_buf->done()) {   // Downstream is done, therefore we're done.
89                 return -1;
90             }
91             return 0;
92         }
93         min_space = std::min(min_space, n);
94     }
95     return min_space;
96 }
97 
propagate_tags(block::tag_propagation_policy_t policy,block_detail * d,const std::vector<uint64_t> & start_nitems_read,double rrate,mpq_class & mp_rrate,bool use_fp_rrate,std::vector<tag_t> & rtags,long block_id)98 static bool propagate_tags(block::tag_propagation_policy_t policy,
99                            block_detail* d,
100                            const std::vector<uint64_t>& start_nitems_read,
101                            double rrate,
102                            mpq_class& mp_rrate,
103                            bool use_fp_rrate,
104                            std::vector<tag_t>& rtags,
105                            long block_id)
106 {
107     static const mpq_class one_half(1, 2);
108 
109     // Move tags downstream
110     // if a sink, we don't need to move downstream
111     if (d->sink_p()) {
112         return true;
113     }
114 
115     switch (policy) {
116     case block::TPP_DONT:
117     case block::TPP_CUSTOM:
118         return true;
119     case block::TPP_ALL_TO_ALL: {
120         // every tag on every input propagates to everyone downstream
121         std::vector<buffer_sptr> out_buf;
122 
123         for (int i = 0; i < d->ninputs(); i++) {
124             d->get_tags_in_range(
125                 rtags, i, start_nitems_read[i], d->nitems_read(i), block_id);
126 
127             if (rtags.empty()) {
128                 continue;
129             }
130 
131             if (out_buf.empty()) {
132                 out_buf.reserve(d->noutputs());
133                 for (int o = 0; o < d->noutputs(); o++)
134                     out_buf.push_back(d->output(o));
135             }
136 
137             std::vector<tag_t>::iterator t;
138             if (rrate == 1.0) {
139                 for (t = rtags.begin(); t != rtags.end(); t++) {
140                     for (int o = 0; o < d->noutputs(); o++)
141                         out_buf[o]->add_item_tag(*t);
142                 }
143             } else if (use_fp_rrate) {
144                 for (t = rtags.begin(); t != rtags.end(); t++) {
145                     tag_t new_tag = *t;
146                     new_tag.offset = ((double)new_tag.offset * rrate) + 0.5;
147                     for (int o = 0; o < d->noutputs(); o++)
148                         out_buf[o]->add_item_tag(new_tag);
149                 }
150             } else {
151                 mpz_class offset;
152                 for (t = rtags.begin(); t != rtags.end(); t++) {
153                     tag_t new_tag = *t;
154                     mpz_import(offset.get_mpz_t(),
155                                1,
156                                1,
157                                sizeof(new_tag.offset),
158                                0,
159                                0,
160                                &new_tag.offset);
161                     offset = offset * mp_rrate + one_half;
162                     new_tag.offset = offset.get_ui();
163                     for (int o = 0; o < d->noutputs(); o++)
164                         out_buf[o]->add_item_tag(new_tag);
165                 }
166             }
167         }
168     } break;
169     case block::TPP_ONE_TO_ONE:
170         // tags from input i only go to output i
171         // this requires d->ninputs() == d->noutputs; this is checked when this
172         // type of tag-propagation system is selected in block_detail
173         if (d->ninputs() == d->noutputs()) {
174             buffer_sptr out_buf;
175 
176             for (int i = 0; i < d->ninputs(); i++) {
177                 d->get_tags_in_range(
178                     rtags, i, start_nitems_read[i], d->nitems_read(i), block_id);
179 
180                 if (rtags.empty()) {
181                     continue;
182                 }
183 
184                 out_buf = d->output(i);
185 
186                 std::vector<tag_t>::iterator t;
187                 if (rrate == 1.0) {
188                     for (t = rtags.begin(); t != rtags.end(); t++) {
189                         out_buf->add_item_tag(*t);
190                     }
191                 } else if (use_fp_rrate) {
192                     for (t = rtags.begin(); t != rtags.end(); t++) {
193                         tag_t new_tag = *t;
194                         new_tag.offset = ((double)new_tag.offset * rrate) + 0.5;
195                         out_buf->add_item_tag(new_tag);
196                     }
197                 } else {
198                     mpz_class offset;
199                     for (t = rtags.begin(); t != rtags.end(); t++) {
200                         tag_t new_tag = *t;
201                         mpz_import(offset.get_mpz_t(),
202                                    1,
203                                    1,
204                                    sizeof(new_tag.offset),
205                                    0,
206                                    0,
207                                    &new_tag.offset);
208                         offset = offset * mp_rrate + one_half;
209                         new_tag.offset = offset.get_ui();
210                         out_buf->add_item_tag(new_tag);
211                     }
212                 }
213             }
214         } else {
215             std::cerr << "Error: block_executor: propagation_policy 'ONE-TO-ONE' "
216                          "requires ninputs == noutputs"
217                       << std::endl;
218             return false;
219         }
220         break;
221     default:
222         return true;
223     }
224     return true;
225 }
226 
block_executor(block_sptr block,int max_noutput_items)227 block_executor::block_executor(block_sptr block, int max_noutput_items)
228     : d_block(block), d_log(0), d_max_noutput_items(max_noutput_items)
229 {
230     if (ENABLE_LOGGING) {
231         std::string name = str(boost::format("sst-%03d.log") % which_scheduler++);
232         d_log = new std::ofstream(name.c_str());
233         std::unitbuf(*d_log); // make it unbuffered...
234         *d_log << "block_executor: " << d_block << std::endl;
235     }
236 
237 #ifdef GR_PERFORMANCE_COUNTERS
238     prefs* prefs = prefs::singleton();
239     d_use_pc = prefs->get_bool("PerfCounters", "on", false);
240 #endif /* GR_PERFORMANCE_COUNTERS */
241 
242     d_block->start(); // enable any drivers, etc.
243 }
244 
~block_executor()245 block_executor::~block_executor()
246 {
247     if (ENABLE_LOGGING)
248         delete d_log;
249 
250     d_block->stop(); // stop any drivers, etc.
251 }
252 
run_one_iteration()253 block_executor::state block_executor::run_one_iteration()
254 {
255     int noutput_items;
256     int max_items_avail;
257     int max_noutput_items;
258     int new_alignment = 0;
259     int alignment_state = -1;
260 
261     block* m = d_block.get();
262     block_detail* d = m->detail().get();
263 
264     LOG(*d_log << std::endl << m);
265 
266     max_noutput_items = round_down(d_max_noutput_items, m->output_multiple());
267 
268     if (d->done()) {
269         assert(0);
270         return DONE;
271     }
272 
273     if (d->source_p()) {
274         d_ninput_items_required.resize(0);
275         d_ninput_items.resize(0);
276         d_input_items.resize(0);
277         d_input_done.resize(0);
278         d_output_items.resize(d->noutputs());
279         d_start_nitems_read.resize(0);
280 
281         // determine the minimum available output space
282         noutput_items =
283             min_available_space(d, m->output_multiple(), m->min_noutput_items());
284         noutput_items = std::min(noutput_items, max_noutput_items);
285         LOG(*d_log << " source\n  noutput_items = " << noutput_items << std::endl);
286         if (noutput_items == -1) // we're done
287             goto were_done;
288 
289         if (noutput_items == 0) { // we're output blocked
290             LOG(*d_log << "  BLKD_OUT\n");
291             return BLKD_OUT;
292         }
293 
294         goto setup_call_to_work; // jump to common code
295     }
296 
297     else if (d->sink_p()) {
298         d_ninput_items_required.resize(d->ninputs());
299         d_ninput_items.resize(d->ninputs());
300         d_input_items.resize(d->ninputs());
301         d_input_done.resize(d->ninputs());
302         d_output_items.resize(0);
303         d_start_nitems_read.resize(d->ninputs());
304         LOG(*d_log << " sink\n");
305 
306         max_items_avail = 0;
307         for (int i = 0; i < d->ninputs(); i++) {
308             {
309                 /*
310                  * Acquire the mutex and grab local copies of items_available and done.
311                  */
312                 buffer_reader_sptr in_buf = d->input(i);
313                 gr::thread::scoped_lock guard(*in_buf->mutex());
314                 d_ninput_items[i] = in_buf->items_available();
315                 d_input_done[i] = in_buf->done();
316             }
317 
318             LOG(*d_log << "  d_ninput_items[" << i << "] = " << d_ninput_items[i]
319                        << std::endl);
320             LOG(*d_log << "  d_input_done[" << i << "] = " << d_input_done[i]
321                        << std::endl);
322 
323             if (d_ninput_items[i] < m->output_multiple() && d_input_done[i])
324                 goto were_done;
325 
326             max_items_avail = std::max(max_items_avail, d_ninput_items[i]);
327         }
328 
329         // take a swag at how much output we can sink
330         noutput_items = (int)(max_items_avail * m->relative_rate());
331         noutput_items = round_down(noutput_items, m->output_multiple());
332         noutput_items = std::min(noutput_items, max_noutput_items);
333         LOG(*d_log << "  max_items_avail = " << max_items_avail << std::endl);
334         LOG(*d_log << "  noutput_items = " << noutput_items << std::endl);
335 
336         if (noutput_items == 0) { // we're blocked on input
337             LOG(*d_log << "  BLKD_IN\n");
338             return BLKD_IN;
339         }
340 
341         goto try_again; // Jump to code shared with regular case.
342     }
343 
344     else {
345         // do the regular thing
346         d_ninput_items_required.resize(d->ninputs());
347         d_ninput_items.resize(d->ninputs());
348         d_input_items.resize(d->ninputs());
349         d_input_done.resize(d->ninputs());
350         d_output_items.resize(d->noutputs());
351         d_start_nitems_read.resize(d->ninputs());
352 
353         max_items_avail = 0;
354         for (int i = 0; i < d->ninputs(); i++) {
355             {
356                 /*
357                  * Acquire the mutex and grab local copies of items_available and done.
358                  */
359                 buffer_reader_sptr in_buf = d->input(i);
360                 gr::thread::scoped_lock guard(*in_buf->mutex());
361                 d_ninput_items[i] = in_buf->items_available();
362                 d_input_done[i] = in_buf->done();
363             }
364             max_items_avail = std::max(max_items_avail, d_ninput_items[i]);
365         }
366 
367         // determine the minimum available output space
368         noutput_items =
369             min_available_space(d, m->output_multiple(), m->min_noutput_items());
370         if (ENABLE_LOGGING) {
371             *d_log << " regular ";
372             *d_log << m->relative_rate_i() << ":" << m->relative_rate_d() << std::endl;
373             *d_log << "  max_items_avail = " << max_items_avail << std::endl;
374             *d_log << "  noutput_items = " << noutput_items << std::endl;
375         }
376         if (noutput_items == -1) // we're done
377             goto were_done;
378 
379         if (noutput_items == 0) { // we're output blocked
380             LOG(*d_log << "  BLKD_OUT\n");
381             return BLKD_OUT;
382         }
383 
384     try_again:
385         if (m->fixed_rate()) {
386             // try to work it forward starting with max_items_avail.
387             // We want to try to consume all the input we've got.
388             int reqd_noutput_items = m->fixed_rate_ninput_to_noutput(max_items_avail);
389 
390             // only test this if we specifically set the output_multiple
391             if (m->output_multiple_set())
392                 reqd_noutput_items = round_down(reqd_noutput_items, m->output_multiple());
393 
394             if (reqd_noutput_items > 0 && reqd_noutput_items <= noutput_items)
395                 noutput_items = reqd_noutput_items;
396 
397             // if we need this many outputs, overrule the max_noutput_items setting
398             max_noutput_items = std::max(m->output_multiple(), max_noutput_items);
399         }
400         noutput_items = std::min(noutput_items, max_noutput_items);
401 
402         // Check if we're still unaligned; use up items until we're
403         // aligned again. Otherwise, make sure we set the alignment
404         // requirement.
405         if (!m->output_multiple_set()) {
406             if (m->is_unaligned()) {
407                 // When unaligned, don't just set noutput_items to the remaining
408                 // samples to meet alignment; this causes too much overhead in
409                 // requiring a premature call back here. Set the maximum amount
410                 // of samples to handle unalignment and get us back aligned.
411                 if (noutput_items >= m->unaligned()) {
412                     noutput_items = round_up(noutput_items, m->alignment()) -
413                                     (m->alignment() - m->unaligned());
414                     new_alignment = 0;
415                 } else {
416                     new_alignment = m->unaligned() - noutput_items;
417                 }
418                 alignment_state = 0;
419             } else if (noutput_items < m->alignment()) {
420                 // if we don't have enough for an aligned call, keep track of
421                 // misalignment, set unaligned flag, and proceed.
422                 new_alignment = m->alignment() - noutput_items;
423                 m->set_unaligned(new_alignment);
424                 m->set_is_unaligned(true);
425                 alignment_state = 1;
426             } else {
427                 // enough to round down to the nearest alignment and process.
428                 noutput_items = round_down(noutput_items, m->alignment());
429                 m->set_is_unaligned(false);
430                 alignment_state = 2;
431             }
432         }
433 
434         // ask the block how much input they need to produce noutput_items
435         m->forecast(noutput_items, d_ninput_items_required);
436 
437         // See if we've got sufficient input available and make sure we
438         // didn't overflow on the input.
439         int i;
440         for (i = 0; i < d->ninputs(); i++) {
441             if (d_ninput_items_required[i] > d_ninput_items[i]) // not enough
442                 break;
443 
444             if (d_ninput_items_required[i] < 0) {
445                 std::cerr << "\nsched: <block " << m->name() << " (" << m->unique_id()
446                           << ")>"
447                           << " thinks its ninput_items required is "
448                           << d_ninput_items_required[i] << " and cannot be negative.\n"
449                           << "Some parameterization is wrong. "
450                           << "Too large a decimation value?\n\n";
451                 goto were_done;
452             }
453         }
454 
455         if (i < d->ninputs()) { // not enough input on input[i]
456             // if we can, try reducing the size of our output request
457             if (noutput_items > m->output_multiple()) {
458                 noutput_items /= 2;
459                 noutput_items = round_up(noutput_items, m->output_multiple());
460                 goto try_again;
461             }
462 
463             // We're blocked on input
464             LOG(*d_log << "  BLKD_IN\n");
465             if (d_input_done[i]) // If the upstream block is done, we're done
466                 goto were_done;
467 
468             // Is it possible to ever fulfill this request?
469             buffer_reader_sptr in_buf = d->input(i);
470             if (d_ninput_items_required[i] > in_buf->max_possible_items_available()) {
471                 // Nope, never going to happen...
472                 std::cerr
473                     << "\nsched: <block " << m->name() << " (" << m->unique_id() << ")>"
474                     << " is requesting more input data\n"
475                     << "  than we can provide.\n"
476                     << "  ninput_items_required = " << d_ninput_items_required[i] << "\n"
477                     << "  max_possible_items_available = "
478                     << in_buf->max_possible_items_available() << "\n"
479                     << "  If this is a filter, consider reducing the number of taps.\n";
480                 goto were_done;
481             }
482 
483             // If we were made unaligned in this round but return here without
484             // processing; reset the unalignment claim before next entry.
485             if (alignment_state == 1) {
486                 m->set_unaligned(0);
487                 m->set_is_unaligned(false);
488             }
489             return BLKD_IN;
490         }
491 
492         // We've got enough data on each input to produce noutput_items.
493         // Finish setting up the call to work.
494         for (int i = 0; i < d->ninputs(); i++)
495             d_input_items[i] = d->input(i)->read_pointer();
496 
497     setup_call_to_work:
498 
499         d->d_produce_or = 0;
500         for (int i = 0; i < d->noutputs(); i++)
501             d_output_items[i] = d->output(i)->write_pointer();
502 
503         // determine where to start looking for new tags
504         for (int i = 0; i < d->ninputs(); i++)
505             d_start_nitems_read[i] = d->nitems_read(i);
506 
507 #ifdef GR_PERFORMANCE_COUNTERS
508         if (d_use_pc)
509             d->start_perf_counters();
510 #endif /* GR_PERFORMANCE_COUNTERS */
511 
512         // Do the actual work of the block
513         int n =
514             m->general_work(noutput_items, d_ninput_items, d_input_items, d_output_items);
515 
516 #ifdef GR_PERFORMANCE_COUNTERS
517         if (d_use_pc)
518             d->stop_perf_counters(noutput_items, n);
519 #endif /* GR_PERFORMANCE_COUNTERS */
520 
521         LOG(*d_log << "  general_work: noutput_items = " << noutput_items
522                    << " result = " << n << std::endl);
523 
524         // Adjust number of unaligned items left to process
525         if (m->is_unaligned()) {
526             m->set_unaligned(new_alignment);
527             m->set_is_unaligned(m->unaligned() != 0);
528         }
529 
530         // Now propagate the tags based on the new relative rate
531         if (!propagate_tags(m->tag_propagation_policy(),
532                             d,
533                             d_start_nitems_read,
534                             m->relative_rate(),
535                             m->mp_relative_rate(),
536                             m->update_rate(),
537                             d_returned_tags,
538                             m->unique_id()))
539             goto were_done;
540 
541         if (n == block::WORK_DONE)
542             goto were_done;
543 
544         if (n != block::WORK_CALLED_PRODUCE)
545             d->produce_each(n); // advance write pointers
546 
547         // For some blocks that can change their produce/consume ratio
548         // (the relative_rate), we might want to automatically update
549         // based on the amount of items written/read.
550         // In the block constructor, use enable_update_rate(true).
551         if (m->update_rate()) {
552             // rrate = ((double)(m->nitems_written(0))) / ((double)m->nitems_read(0));
553             // if(rrate > 0.0)
554             //  m->set_relative_rate(rrate);
555             if ((n > 0) && (d->consumed() > 0))
556                 m->set_relative_rate((uint64_t)n, (uint64_t)d->consumed());
557         }
558 
559         if (d->d_produce_or > 0) // block produced something
560             return READY;
561 
562         // We didn't produce any output even though we called general_work.
563         // We have (most likely) consumed some input.
564 
565         /*
566         // If this is a source, it's broken.
567         if(d->source_p()) {
568           std::cerr << "block_executor: source " << m
569                     << " produced no output.  We're marking it DONE.\n";
570           // FIXME maybe we ought to raise an exception...
571           goto were_done;
572         }
573         */
574 
575         // Have the caller try again...
576         return READY_NO_OUTPUT;
577     }
578     assert(0);
579 
580 were_done:
581     LOG(*d_log << "  were_done\n");
582     d->set_done(true);
583     return DONE;
584 }
585 
586 } /* namespace gr */
587