1 /* -*- c++ -*- */
2 /*
3 * Copyright 2004,2008-2010,2013,2017 Free Software Foundation, Inc.
4 *
5 * This file is part of GNU Radio
6 *
7 * GNU Radio is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 3, or (at your option)
10 * any later version.
11 *
12 * GNU Radio is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with GNU Radio; see the file COPYING. If not, write to
19 * the Free Software Foundation, Inc., 51 Franklin Street,
20 * Boston, MA 02110-1301, USA.
21 */
22
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26
27 #include <gnuradio/block.h>
28 #include <gnuradio/block_detail.h>
29 #include <gnuradio/buffer.h>
30 #include <gnuradio/prefs.h>
31 #include <assert.h>
32 #include <block_executor.h>
33 #include <stdio.h>
34 #include <boost/format.hpp>
35 #include <boost/thread.hpp>
36 #include <iostream>
37 #include <limits>
38
39 namespace gr {
40
41 // must be defined to either 0 or 1
42 #define ENABLE_LOGGING 0
43
44 #if (ENABLE_LOGGING)
45 #define LOG(x) \
46 do { \
47 x; \
48 } while (0)
49 #else
50 #define LOG(x) \
51 do { \
52 ; \
53 } while (0)
54 #endif
55
56 static int which_scheduler = 0;
57
round_up(unsigned int n,unsigned int multiple)58 inline static unsigned int round_up(unsigned int n, unsigned int multiple)
59 {
60 return ((n + multiple - 1) / multiple) * multiple;
61 }
62
round_down(unsigned int n,unsigned int multiple)63 inline static unsigned int round_down(unsigned int n, unsigned int multiple)
64 {
65 return (n / multiple) * multiple;
66 }
67
68 //
69 // Return minimum available write space in all our downstream
70 // buffers or -1 if we're output blocked and the output we're
71 // blocked on is done.
72 //
73 static int
min_available_space(block_detail * d,int output_multiple,int min_noutput_items)74 min_available_space(block_detail* d, int output_multiple, int min_noutput_items)
75 {
76 int min_space = std::numeric_limits<int>::max();
77 if (min_noutput_items == 0)
78 min_noutput_items = 1;
79 for (int i = 0; i < d->noutputs(); i++) {
80 buffer_sptr out_buf = d->output(i);
81 gr::thread::scoped_lock guard(*out_buf->mutex());
82 int avail_n = round_down(out_buf->space_available(), output_multiple);
83 int best_n = round_down(out_buf->bufsize() / 2, output_multiple);
84 if (best_n < min_noutput_items)
85 throw std::runtime_error("Buffer too small for min_noutput_items");
86 int n = std::min(avail_n, best_n);
87 if (n < min_noutput_items) { // We're blocked on output.
88 if (out_buf->done()) { // Downstream is done, therefore we're done.
89 return -1;
90 }
91 return 0;
92 }
93 min_space = std::min(min_space, n);
94 }
95 return min_space;
96 }
97
propagate_tags(block::tag_propagation_policy_t policy,block_detail * d,const std::vector<uint64_t> & start_nitems_read,double rrate,mpq_class & mp_rrate,bool use_fp_rrate,std::vector<tag_t> & rtags,long block_id)98 static bool propagate_tags(block::tag_propagation_policy_t policy,
99 block_detail* d,
100 const std::vector<uint64_t>& start_nitems_read,
101 double rrate,
102 mpq_class& mp_rrate,
103 bool use_fp_rrate,
104 std::vector<tag_t>& rtags,
105 long block_id)
106 {
107 static const mpq_class one_half(1, 2);
108
109 // Move tags downstream
110 // if a sink, we don't need to move downstream
111 if (d->sink_p()) {
112 return true;
113 }
114
115 switch (policy) {
116 case block::TPP_DONT:
117 case block::TPP_CUSTOM:
118 return true;
119 case block::TPP_ALL_TO_ALL: {
120 // every tag on every input propagates to everyone downstream
121 std::vector<buffer_sptr> out_buf;
122
123 for (int i = 0; i < d->ninputs(); i++) {
124 d->get_tags_in_range(
125 rtags, i, start_nitems_read[i], d->nitems_read(i), block_id);
126
127 if (rtags.empty()) {
128 continue;
129 }
130
131 if (out_buf.empty()) {
132 out_buf.reserve(d->noutputs());
133 for (int o = 0; o < d->noutputs(); o++)
134 out_buf.push_back(d->output(o));
135 }
136
137 std::vector<tag_t>::iterator t;
138 if (rrate == 1.0) {
139 for (t = rtags.begin(); t != rtags.end(); t++) {
140 for (int o = 0; o < d->noutputs(); o++)
141 out_buf[o]->add_item_tag(*t);
142 }
143 } else if (use_fp_rrate) {
144 for (t = rtags.begin(); t != rtags.end(); t++) {
145 tag_t new_tag = *t;
146 new_tag.offset = ((double)new_tag.offset * rrate) + 0.5;
147 for (int o = 0; o < d->noutputs(); o++)
148 out_buf[o]->add_item_tag(new_tag);
149 }
150 } else {
151 mpz_class offset;
152 for (t = rtags.begin(); t != rtags.end(); t++) {
153 tag_t new_tag = *t;
154 mpz_import(offset.get_mpz_t(),
155 1,
156 1,
157 sizeof(new_tag.offset),
158 0,
159 0,
160 &new_tag.offset);
161 offset = offset * mp_rrate + one_half;
162 new_tag.offset = offset.get_ui();
163 for (int o = 0; o < d->noutputs(); o++)
164 out_buf[o]->add_item_tag(new_tag);
165 }
166 }
167 }
168 } break;
169 case block::TPP_ONE_TO_ONE:
170 // tags from input i only go to output i
171 // this requires d->ninputs() == d->noutputs; this is checked when this
172 // type of tag-propagation system is selected in block_detail
173 if (d->ninputs() == d->noutputs()) {
174 buffer_sptr out_buf;
175
176 for (int i = 0; i < d->ninputs(); i++) {
177 d->get_tags_in_range(
178 rtags, i, start_nitems_read[i], d->nitems_read(i), block_id);
179
180 if (rtags.empty()) {
181 continue;
182 }
183
184 out_buf = d->output(i);
185
186 std::vector<tag_t>::iterator t;
187 if (rrate == 1.0) {
188 for (t = rtags.begin(); t != rtags.end(); t++) {
189 out_buf->add_item_tag(*t);
190 }
191 } else if (use_fp_rrate) {
192 for (t = rtags.begin(); t != rtags.end(); t++) {
193 tag_t new_tag = *t;
194 new_tag.offset = ((double)new_tag.offset * rrate) + 0.5;
195 out_buf->add_item_tag(new_tag);
196 }
197 } else {
198 mpz_class offset;
199 for (t = rtags.begin(); t != rtags.end(); t++) {
200 tag_t new_tag = *t;
201 mpz_import(offset.get_mpz_t(),
202 1,
203 1,
204 sizeof(new_tag.offset),
205 0,
206 0,
207 &new_tag.offset);
208 offset = offset * mp_rrate + one_half;
209 new_tag.offset = offset.get_ui();
210 out_buf->add_item_tag(new_tag);
211 }
212 }
213 }
214 } else {
215 std::cerr << "Error: block_executor: propagation_policy 'ONE-TO-ONE' "
216 "requires ninputs == noutputs"
217 << std::endl;
218 return false;
219 }
220 break;
221 default:
222 return true;
223 }
224 return true;
225 }
226
block_executor(block_sptr block,int max_noutput_items)227 block_executor::block_executor(block_sptr block, int max_noutput_items)
228 : d_block(block), d_log(0), d_max_noutput_items(max_noutput_items)
229 {
230 if (ENABLE_LOGGING) {
231 std::string name = str(boost::format("sst-%03d.log") % which_scheduler++);
232 d_log = new std::ofstream(name.c_str());
233 std::unitbuf(*d_log); // make it unbuffered...
234 *d_log << "block_executor: " << d_block << std::endl;
235 }
236
237 #ifdef GR_PERFORMANCE_COUNTERS
238 prefs* prefs = prefs::singleton();
239 d_use_pc = prefs->get_bool("PerfCounters", "on", false);
240 #endif /* GR_PERFORMANCE_COUNTERS */
241
242 d_block->start(); // enable any drivers, etc.
243 }
244
~block_executor()245 block_executor::~block_executor()
246 {
247 if (ENABLE_LOGGING)
248 delete d_log;
249
250 d_block->stop(); // stop any drivers, etc.
251 }
252
run_one_iteration()253 block_executor::state block_executor::run_one_iteration()
254 {
255 int noutput_items;
256 int max_items_avail;
257 int max_noutput_items;
258 int new_alignment = 0;
259 int alignment_state = -1;
260
261 block* m = d_block.get();
262 block_detail* d = m->detail().get();
263
264 LOG(*d_log << std::endl << m);
265
266 max_noutput_items = round_down(d_max_noutput_items, m->output_multiple());
267
268 if (d->done()) {
269 assert(0);
270 return DONE;
271 }
272
273 if (d->source_p()) {
274 d_ninput_items_required.resize(0);
275 d_ninput_items.resize(0);
276 d_input_items.resize(0);
277 d_input_done.resize(0);
278 d_output_items.resize(d->noutputs());
279 d_start_nitems_read.resize(0);
280
281 // determine the minimum available output space
282 noutput_items =
283 min_available_space(d, m->output_multiple(), m->min_noutput_items());
284 noutput_items = std::min(noutput_items, max_noutput_items);
285 LOG(*d_log << " source\n noutput_items = " << noutput_items << std::endl);
286 if (noutput_items == -1) // we're done
287 goto were_done;
288
289 if (noutput_items == 0) { // we're output blocked
290 LOG(*d_log << " BLKD_OUT\n");
291 return BLKD_OUT;
292 }
293
294 goto setup_call_to_work; // jump to common code
295 }
296
297 else if (d->sink_p()) {
298 d_ninput_items_required.resize(d->ninputs());
299 d_ninput_items.resize(d->ninputs());
300 d_input_items.resize(d->ninputs());
301 d_input_done.resize(d->ninputs());
302 d_output_items.resize(0);
303 d_start_nitems_read.resize(d->ninputs());
304 LOG(*d_log << " sink\n");
305
306 max_items_avail = 0;
307 for (int i = 0; i < d->ninputs(); i++) {
308 {
309 /*
310 * Acquire the mutex and grab local copies of items_available and done.
311 */
312 buffer_reader_sptr in_buf = d->input(i);
313 gr::thread::scoped_lock guard(*in_buf->mutex());
314 d_ninput_items[i] = in_buf->items_available();
315 d_input_done[i] = in_buf->done();
316 }
317
318 LOG(*d_log << " d_ninput_items[" << i << "] = " << d_ninput_items[i]
319 << std::endl);
320 LOG(*d_log << " d_input_done[" << i << "] = " << d_input_done[i]
321 << std::endl);
322
323 if (d_ninput_items[i] < m->output_multiple() && d_input_done[i])
324 goto were_done;
325
326 max_items_avail = std::max(max_items_avail, d_ninput_items[i]);
327 }
328
329 // take a swag at how much output we can sink
330 noutput_items = (int)(max_items_avail * m->relative_rate());
331 noutput_items = round_down(noutput_items, m->output_multiple());
332 noutput_items = std::min(noutput_items, max_noutput_items);
333 LOG(*d_log << " max_items_avail = " << max_items_avail << std::endl);
334 LOG(*d_log << " noutput_items = " << noutput_items << std::endl);
335
336 if (noutput_items == 0) { // we're blocked on input
337 LOG(*d_log << " BLKD_IN\n");
338 return BLKD_IN;
339 }
340
341 goto try_again; // Jump to code shared with regular case.
342 }
343
344 else {
345 // do the regular thing
346 d_ninput_items_required.resize(d->ninputs());
347 d_ninput_items.resize(d->ninputs());
348 d_input_items.resize(d->ninputs());
349 d_input_done.resize(d->ninputs());
350 d_output_items.resize(d->noutputs());
351 d_start_nitems_read.resize(d->ninputs());
352
353 max_items_avail = 0;
354 for (int i = 0; i < d->ninputs(); i++) {
355 {
356 /*
357 * Acquire the mutex and grab local copies of items_available and done.
358 */
359 buffer_reader_sptr in_buf = d->input(i);
360 gr::thread::scoped_lock guard(*in_buf->mutex());
361 d_ninput_items[i] = in_buf->items_available();
362 d_input_done[i] = in_buf->done();
363 }
364 max_items_avail = std::max(max_items_avail, d_ninput_items[i]);
365 }
366
367 // determine the minimum available output space
368 noutput_items =
369 min_available_space(d, m->output_multiple(), m->min_noutput_items());
370 if (ENABLE_LOGGING) {
371 *d_log << " regular ";
372 *d_log << m->relative_rate_i() << ":" << m->relative_rate_d() << std::endl;
373 *d_log << " max_items_avail = " << max_items_avail << std::endl;
374 *d_log << " noutput_items = " << noutput_items << std::endl;
375 }
376 if (noutput_items == -1) // we're done
377 goto were_done;
378
379 if (noutput_items == 0) { // we're output blocked
380 LOG(*d_log << " BLKD_OUT\n");
381 return BLKD_OUT;
382 }
383
384 try_again:
385 if (m->fixed_rate()) {
386 // try to work it forward starting with max_items_avail.
387 // We want to try to consume all the input we've got.
388 int reqd_noutput_items = m->fixed_rate_ninput_to_noutput(max_items_avail);
389
390 // only test this if we specifically set the output_multiple
391 if (m->output_multiple_set())
392 reqd_noutput_items = round_down(reqd_noutput_items, m->output_multiple());
393
394 if (reqd_noutput_items > 0 && reqd_noutput_items <= noutput_items)
395 noutput_items = reqd_noutput_items;
396
397 // if we need this many outputs, overrule the max_noutput_items setting
398 max_noutput_items = std::max(m->output_multiple(), max_noutput_items);
399 }
400 noutput_items = std::min(noutput_items, max_noutput_items);
401
402 // Check if we're still unaligned; use up items until we're
403 // aligned again. Otherwise, make sure we set the alignment
404 // requirement.
405 if (!m->output_multiple_set()) {
406 if (m->is_unaligned()) {
407 // When unaligned, don't just set noutput_items to the remaining
408 // samples to meet alignment; this causes too much overhead in
409 // requiring a premature call back here. Set the maximum amount
410 // of samples to handle unalignment and get us back aligned.
411 if (noutput_items >= m->unaligned()) {
412 noutput_items = round_up(noutput_items, m->alignment()) -
413 (m->alignment() - m->unaligned());
414 new_alignment = 0;
415 } else {
416 new_alignment = m->unaligned() - noutput_items;
417 }
418 alignment_state = 0;
419 } else if (noutput_items < m->alignment()) {
420 // if we don't have enough for an aligned call, keep track of
421 // misalignment, set unaligned flag, and proceed.
422 new_alignment = m->alignment() - noutput_items;
423 m->set_unaligned(new_alignment);
424 m->set_is_unaligned(true);
425 alignment_state = 1;
426 } else {
427 // enough to round down to the nearest alignment and process.
428 noutput_items = round_down(noutput_items, m->alignment());
429 m->set_is_unaligned(false);
430 alignment_state = 2;
431 }
432 }
433
434 // ask the block how much input they need to produce noutput_items
435 m->forecast(noutput_items, d_ninput_items_required);
436
437 // See if we've got sufficient input available and make sure we
438 // didn't overflow on the input.
439 int i;
440 for (i = 0; i < d->ninputs(); i++) {
441 if (d_ninput_items_required[i] > d_ninput_items[i]) // not enough
442 break;
443
444 if (d_ninput_items_required[i] < 0) {
445 std::cerr << "\nsched: <block " << m->name() << " (" << m->unique_id()
446 << ")>"
447 << " thinks its ninput_items required is "
448 << d_ninput_items_required[i] << " and cannot be negative.\n"
449 << "Some parameterization is wrong. "
450 << "Too large a decimation value?\n\n";
451 goto were_done;
452 }
453 }
454
455 if (i < d->ninputs()) { // not enough input on input[i]
456 // if we can, try reducing the size of our output request
457 if (noutput_items > m->output_multiple()) {
458 noutput_items /= 2;
459 noutput_items = round_up(noutput_items, m->output_multiple());
460 goto try_again;
461 }
462
463 // We're blocked on input
464 LOG(*d_log << " BLKD_IN\n");
465 if (d_input_done[i]) // If the upstream block is done, we're done
466 goto were_done;
467
468 // Is it possible to ever fulfill this request?
469 buffer_reader_sptr in_buf = d->input(i);
470 if (d_ninput_items_required[i] > in_buf->max_possible_items_available()) {
471 // Nope, never going to happen...
472 std::cerr
473 << "\nsched: <block " << m->name() << " (" << m->unique_id() << ")>"
474 << " is requesting more input data\n"
475 << " than we can provide.\n"
476 << " ninput_items_required = " << d_ninput_items_required[i] << "\n"
477 << " max_possible_items_available = "
478 << in_buf->max_possible_items_available() << "\n"
479 << " If this is a filter, consider reducing the number of taps.\n";
480 goto were_done;
481 }
482
483 // If we were made unaligned in this round but return here without
484 // processing; reset the unalignment claim before next entry.
485 if (alignment_state == 1) {
486 m->set_unaligned(0);
487 m->set_is_unaligned(false);
488 }
489 return BLKD_IN;
490 }
491
492 // We've got enough data on each input to produce noutput_items.
493 // Finish setting up the call to work.
494 for (int i = 0; i < d->ninputs(); i++)
495 d_input_items[i] = d->input(i)->read_pointer();
496
497 setup_call_to_work:
498
499 d->d_produce_or = 0;
500 for (int i = 0; i < d->noutputs(); i++)
501 d_output_items[i] = d->output(i)->write_pointer();
502
503 // determine where to start looking for new tags
504 for (int i = 0; i < d->ninputs(); i++)
505 d_start_nitems_read[i] = d->nitems_read(i);
506
507 #ifdef GR_PERFORMANCE_COUNTERS
508 if (d_use_pc)
509 d->start_perf_counters();
510 #endif /* GR_PERFORMANCE_COUNTERS */
511
512 // Do the actual work of the block
513 int n =
514 m->general_work(noutput_items, d_ninput_items, d_input_items, d_output_items);
515
516 #ifdef GR_PERFORMANCE_COUNTERS
517 if (d_use_pc)
518 d->stop_perf_counters(noutput_items, n);
519 #endif /* GR_PERFORMANCE_COUNTERS */
520
521 LOG(*d_log << " general_work: noutput_items = " << noutput_items
522 << " result = " << n << std::endl);
523
524 // Adjust number of unaligned items left to process
525 if (m->is_unaligned()) {
526 m->set_unaligned(new_alignment);
527 m->set_is_unaligned(m->unaligned() != 0);
528 }
529
530 // Now propagate the tags based on the new relative rate
531 if (!propagate_tags(m->tag_propagation_policy(),
532 d,
533 d_start_nitems_read,
534 m->relative_rate(),
535 m->mp_relative_rate(),
536 m->update_rate(),
537 d_returned_tags,
538 m->unique_id()))
539 goto were_done;
540
541 if (n == block::WORK_DONE)
542 goto were_done;
543
544 if (n != block::WORK_CALLED_PRODUCE)
545 d->produce_each(n); // advance write pointers
546
547 // For some blocks that can change their produce/consume ratio
548 // (the relative_rate), we might want to automatically update
549 // based on the amount of items written/read.
550 // In the block constructor, use enable_update_rate(true).
551 if (m->update_rate()) {
552 // rrate = ((double)(m->nitems_written(0))) / ((double)m->nitems_read(0));
553 // if(rrate > 0.0)
554 // m->set_relative_rate(rrate);
555 if ((n > 0) && (d->consumed() > 0))
556 m->set_relative_rate((uint64_t)n, (uint64_t)d->consumed());
557 }
558
559 if (d->d_produce_or > 0) // block produced something
560 return READY;
561
562 // We didn't produce any output even though we called general_work.
563 // We have (most likely) consumed some input.
564
565 /*
566 // If this is a source, it's broken.
567 if(d->source_p()) {
568 std::cerr << "block_executor: source " << m
569 << " produced no output. We're marking it DONE.\n";
570 // FIXME maybe we ought to raise an exception...
571 goto were_done;
572 }
573 */
574
575 // Have the caller try again...
576 return READY_NO_OUTPUT;
577 }
578 assert(0);
579
580 were_done:
581 LOG(*d_log << " were_done\n");
582 d->set_done(true);
583 return DONE;
584 }
585
586 } /* namespace gr */
587