1 /* -*- c++ -*- */
2 /*
3  * Copyright 2006,2008,2009,2011,2013 Free Software Foundation, Inc.
4  *
5  * This file is part of GNU Radio
6  *
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3, or (at your option)
10  * any later version.
11  *
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with GNU Radio; see the file COPYING.  If not, write to
19  * the Free Software Foundation, Inc., 51 Franklin Street,
20  * Boston, MA 02110-1301, USA.
21  */
22 
23 #ifndef INCLUDED_GR_BASIC_BLOCK_H
24 #define INCLUDED_GR_BASIC_BLOCK_H
25 
26 #include <gnuradio/api.h>
27 #include <gnuradio/io_signature.h>
28 #include <gnuradio/msg_accepter.h>
29 #include <gnuradio/runtime_types.h>
30 #include <gnuradio/sptr_magic.h>
31 #include <gnuradio/thread/thread.h>
32 #include <boost/enable_shared_from_this.hpp>
33 #include <boost/foreach.hpp>
34 #include <boost/function.hpp>
35 #include <boost/thread/condition_variable.hpp>
36 #if (BOOST_VERSION >= 106000)
37 #include <boost/bind/bind.hpp>
38 #endif
39 #include <deque>
40 #include <iostream>
41 #include <map>
42 #include <string>
43 
44 #include <gnuradio/rpcregisterhelpers.h>
45 
46 namespace gr {
47 #if (BOOST_VERSION >= 106000)
48 using namespace boost::placeholders;
49 #endif
50 /*!
51  * \brief The abstract base class for all signal processing blocks.
52  * \ingroup internal
53  *
54  * Basic blocks are the bare abstraction of an entity that has a
55  * name, a set of inputs and outputs, and a message queue.  These
56  * are never instantiated directly; rather, this is the abstract
57  * parent class of both gr_hier_block, which is a recursive
58  * container, and block, which implements actual signal
59  * processing functions.
60  */
61 class GR_RUNTIME_API basic_block : public msg_accepter,
62                                    public boost::enable_shared_from_this<basic_block>
63 {
64     typedef boost::function<void(pmt::pmt_t)> msg_handler_t;
65 
66 private:
67     typedef std::map<pmt::pmt_t, msg_handler_t, pmt::comparator> d_msg_handlers_t;
68     d_msg_handlers_t d_msg_handlers;
69 
70     typedef std::deque<pmt::pmt_t> msg_queue_t;
71     typedef std::map<pmt::pmt_t, msg_queue_t, pmt::comparator> msg_queue_map_t;
72     typedef std::map<pmt::pmt_t, msg_queue_t, pmt::comparator>::iterator
73         msg_queue_map_itr;
74     std::map<pmt::pmt_t, boost::shared_ptr<boost::condition_variable>, pmt::comparator>
75         msg_queue_ready;
76 
77     gr::thread::mutex mutex; //< protects all vars
78 
79 protected:
80     friend class flowgraph;
81     friend class flat_flowgraph; // TODO: will be redundant
82     friend class tpb_thread_body;
83 
84     enum vcolor { WHITE, GREY, BLACK };
85 
86     std::string d_name;
87     gr::io_signature::sptr d_input_signature;
88     gr::io_signature::sptr d_output_signature;
89     long d_unique_id;
90     long d_symbolic_id;
91     std::string d_symbol_name;
92     std::string d_symbol_alias;
93     vcolor d_color;
94     bool d_rpc_set;
95 
96     msg_queue_map_t msg_queue;
97     std::vector<rpcbasic_sptr> d_rpc_vars; // container for all RPC variables
98 
basic_block(void)99     basic_block(void) {} // allows pure virtual interface sub-classes
100 
101     //! Protected constructor prevents instantiation by non-derived classes
102     basic_block(const std::string& name,
103                 gr::io_signature::sptr input_signature,
104                 gr::io_signature::sptr output_signature);
105 
106     //! may only be called during constructor
set_input_signature(gr::io_signature::sptr iosig)107     void set_input_signature(gr::io_signature::sptr iosig) { d_input_signature = iosig; }
108 
109     //! may only be called during constructor
set_output_signature(gr::io_signature::sptr iosig)110     void set_output_signature(gr::io_signature::sptr iosig)
111     {
112         d_output_signature = iosig;
113     }
114 
115     /*!
116      * \brief Allow the flowgraph to set for sorting and partitioning
117      */
set_color(vcolor color)118     void set_color(vcolor color) { d_color = color; }
color()119     vcolor color() const { return d_color; }
120 
121     /*!
122      * \brief Tests if there is a handler attached to port \p which_port
123      */
has_msg_handler(pmt::pmt_t which_port)124     virtual bool has_msg_handler(pmt::pmt_t which_port)
125     {
126         return (d_msg_handlers.find(which_port) != d_msg_handlers.end());
127     }
128 
129     /*
130      * This function is called by the runtime system to dispatch messages.
131      *
132      * The thread-safety guarantees mentioned in set_msg_handler are
133      * implemented by the callers of this method.
134      */
dispatch_msg(pmt::pmt_t which_port,pmt::pmt_t msg)135     virtual void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg)
136     {
137         // AA Update this
138         if (has_msg_handler(which_port)) {   // Is there a handler?
139             d_msg_handlers[which_port](msg); // Yes, invoke it.
140         }
141     }
142 
143     // Message passing interface
144     pmt::pmt_t d_message_subscribers;
145 
146 public:
147     pmt::pmt_t message_subscribers(pmt::pmt_t port);
148     virtual ~basic_block();
unique_id()149     long unique_id() const { return d_unique_id; }
symbolic_id()150     long symbolic_id() const { return d_symbolic_id; }
151 
152     /*! The name of the block */
name()153     std::string name() const { return d_name; }
154 
155     /*!
156      * The sybolic name of the block, which is used in the
157      * block_registry. The name is assigned by the block's constructor
158      * and never changes during the life of the block.
159      */
symbol_name()160     std::string symbol_name() const { return d_symbol_name; }
identifier()161     std::string identifier() const
162     {
163         return this->name() + "(" + std::to_string(this->unique_id()) + ")";
164     }
165 
input_signature()166     gr::io_signature::sptr input_signature() const { return d_input_signature; }
output_signature()167     gr::io_signature::sptr output_signature() const { return d_output_signature; }
168     basic_block_sptr to_basic_block(); // Needed for Python type coercion
169 
170     /*!
171      * True if the block has an alias (see set_block_alias).
172      */
alias_set()173     bool alias_set() const { return !d_symbol_alias.empty(); }
174 
175     /*!
176      * Returns the block's alias as a string.
177      */
alias()178     std::string alias() const { return alias_set() ? d_symbol_alias : symbol_name(); }
179 
180     /*!
181      * Returns the block's alias as PMT.
182      */
alias_pmt()183     pmt::pmt_t alias_pmt() const { return pmt::intern(alias()); }
184 
185     /*!
186      * Set's a new alias for the block; also adds an entry into the
187      * block_registry to get the block using either the alias or the
188      * original symbol name.
189      */
190     void set_block_alias(std::string name);
191 
192     // ** Message passing interface **
193     void message_port_register_in(pmt::pmt_t port_id);
194     void message_port_register_out(pmt::pmt_t port_id);
195     void message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg);
196     void message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target);
197     void message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target);
198 
message_port_is_hier(pmt::pmt_t port_id)199     virtual bool message_port_is_hier(pmt::pmt_t port_id)
200     {
201         (void)port_id;
202         return false;
203     }
message_port_is_hier_in(pmt::pmt_t port_id)204     virtual bool message_port_is_hier_in(pmt::pmt_t port_id)
205     {
206         (void)port_id;
207         return false;
208     }
message_port_is_hier_out(pmt::pmt_t port_id)209     virtual bool message_port_is_hier_out(pmt::pmt_t port_id)
210     {
211         (void)port_id;
212         return false;
213     }
214 
215     /*!
216      * \brief Get input message port names.
217      *
218      * Returns the available input message ports for a block. The
219      * return object is a PMT vector that is filled with PMT symbols.
220      */
221     pmt::pmt_t message_ports_in();
222 
223     /*!
224      * \brief Get output message port names.
225      *
226      * Returns the available output message ports for a block. The
227      * return object is a PMT vector that is filled with PMT symbols.
228      */
229     pmt::pmt_t message_ports_out();
230 
231     /*!
232      * Accept msg, place in queue, arrange for thread to be awakened if it's not already.
233      */
234     void _post(pmt::pmt_t which_port, pmt::pmt_t msg);
235 
236     //! is the queue empty?
empty_p(pmt::pmt_t which_port)237     bool empty_p(pmt::pmt_t which_port)
238     {
239         if (msg_queue.find(which_port) == msg_queue.end())
240             throw std::runtime_error("port does not exist!");
241         return msg_queue[which_port].empty();
242     }
empty_p()243     bool empty_p()
244     {
245         bool rv = true;
246         BOOST_FOREACH (msg_queue_map_t::value_type& i, msg_queue) {
247             rv &= msg_queue[i.first].empty();
248         }
249         return rv;
250     }
251 
252     //! are all msg ports with handlers empty?
empty_handled_p(pmt::pmt_t which_port)253     bool empty_handled_p(pmt::pmt_t which_port)
254     {
255         return (empty_p(which_port) || !has_msg_handler(which_port));
256     }
empty_handled_p()257     bool empty_handled_p()
258     {
259         bool rv = true;
260         BOOST_FOREACH (msg_queue_map_t::value_type& i, msg_queue) {
261             rv &= empty_handled_p(i.first);
262         }
263         return rv;
264     }
265 
266     //! How many messages in the queue?
nmsgs(pmt::pmt_t which_port)267     size_t nmsgs(pmt::pmt_t which_port)
268     {
269         if (msg_queue.find(which_port) == msg_queue.end())
270             throw std::runtime_error("port does not exist!");
271         return msg_queue[which_port].size();
272     }
273 
274     //| Acquires and release the mutex
275     void insert_tail(pmt::pmt_t which_port, pmt::pmt_t msg);
276     /*!
277      * \returns returns pmt at head of queue or pmt::pmt_t() if empty.
278      */
279     pmt::pmt_t delete_head_nowait(pmt::pmt_t which_port);
280 
get_iterator(pmt::pmt_t which_port)281     msg_queue_t::iterator get_iterator(pmt::pmt_t which_port)
282     {
283         return msg_queue[which_port].begin();
284     }
285 
erase_msg(pmt::pmt_t which_port,msg_queue_t::iterator it)286     void erase_msg(pmt::pmt_t which_port, msg_queue_t::iterator it)
287     {
288         msg_queue[which_port].erase(it);
289     }
290 
has_msg_port(pmt::pmt_t which_port)291     virtual bool has_msg_port(pmt::pmt_t which_port)
292     {
293         if (msg_queue.find(which_port) != msg_queue.end()) {
294             return true;
295         }
296         if (pmt::dict_has_key(d_message_subscribers, which_port)) {
297             return true;
298         }
299         return false;
300     }
301 
get_msg_map(void)302     const msg_queue_map_t& get_msg_map(void) const { return msg_queue; }
303 
304 #ifdef GR_CTRLPORT
305     /*!
306      * \brief Add an RPC variable (get or set).
307      *
308      * Using controlport, we create new getters/setters and need to
309      * store them. Each block has a vector to do this, and these never
310      * need to be accessed again once they are registered with the RPC
311      * backend. This function takes a
312      * boost::shared_sptr<rpcbasic_base> so that when the block is
313      * deleted, all RPC registered variables are cleaned up.
314      *
315      * \param s an rpcbasic_sptr of the new RPC variable register to store.
316      */
add_rpc_variable(rpcbasic_sptr s)317     void add_rpc_variable(rpcbasic_sptr s) { d_rpc_vars.push_back(s); }
318 #endif /* GR_CTRLPORT */
319 
320     /*!
321      * \brief Set up the RPC registered variables.
322      *
323      * This must be overloaded by a block that wants to use
324      * controlport. This is where rpcbasic_register_{get,set} pointers
325      * are created, which then get wrapped as shared pointers
326      * (rpcbasic_sptr(...)) and stored using add_rpc_variable.
327      */
setup_rpc()328     virtual void setup_rpc(){};
329 
330     /*!
331      * \brief Ask if this block has been registered to the RPC.
332      *
333      * We can only register a block once, so we use this to protect us
334      * from calling it multiple times.
335      */
is_rpc_set()336     bool is_rpc_set() { return d_rpc_set; }
337 
338     /*!
339      * \brief When the block is registered with the RPC, set this.
340      */
rpc_set()341     void rpc_set() { d_rpc_set = true; }
342 
343     /*!
344      * \brief Confirm that ninputs and noutputs is an acceptable combination.
345      *
346      * \param ninputs	number of input streams connected
347      * \param noutputs	number of output streams connected
348      *
349      * \returns true if this is a valid configuration for this block.
350      *
351      * This function is called by the runtime system whenever the
352      * topology changes. Most classes do not need to override this.
353      * This check is in addition to the constraints specified by the
354      * input and output gr::io_signatures.
355      */
check_topology(int ninputs,int noutputs)356     virtual bool check_topology(int ninputs, int noutputs)
357     {
358         (void)ninputs;
359         (void)noutputs;
360         return true;
361     }
362 
363     /*!
364      * \brief Set the callback that is fired when messages are available.
365      *
366      * \p msg_handler can be any kind of function pointer or function object
367      * that has the signature:
368      * <pre>
369      *    void msg_handler(pmt::pmt msg);
370      * </pre>
371      *
372      * (You may want to use boost::bind to massage your callable into
373      * the correct form.  See gr::blocks::nop for an example that sets
374      * up a class method as the callback.)
375      *
376      * Blocks that desire to handle messages must call this method in
377      * their constructors to register the handler that will be invoked
378      * when messages are available.
379      *
380      * If the block inherits from block, the runtime system will
381      * ensure that msg_handler is called in a thread-safe manner, such
382      * that work and msg_handler will never be called concurrently.
383      * This allows msg_handler to update state variables without
384      * having to worry about thread-safety issues with work,
385      * general_work or another invocation of msg_handler.
386      *
387      * If the block inherits from hier_block2, the runtime system
388      * will ensure that no reentrant calls are made to msg_handler.
389      */
390     template <typename T>
set_msg_handler(pmt::pmt_t which_port,T msg_handler)391     void set_msg_handler(pmt::pmt_t which_port, T msg_handler)
392     {
393         if (msg_queue.find(which_port) == msg_queue.end()) {
394             throw std::runtime_error(
395                 "attempt to set_msg_handler() on bad input message port!");
396         }
397         d_msg_handlers[which_port] = msg_handler_t(msg_handler);
398     }
399 
400     virtual void set_processor_affinity(const std::vector<int>& mask) = 0;
401 
402     virtual void unset_processor_affinity() = 0;
403 
404     virtual std::vector<int> processor_affinity() = 0;
405 
406     virtual void set_log_level(std::string level) = 0;
407 
408     virtual std::string log_level() = 0;
409 };
410 
411 inline bool operator<(basic_block_sptr lhs, basic_block_sptr rhs)
412 {
413     return lhs->unique_id() < rhs->unique_id();
414 }
415 
416 typedef std::vector<basic_block_sptr> basic_block_vector_t;
417 typedef std::vector<basic_block_sptr>::iterator basic_block_viter_t;
418 
419 GR_RUNTIME_API long basic_block_ncurrently_allocated();
420 
421 inline std::ostream& operator<<(std::ostream& os, basic_block_sptr basic_block)
422 {
423     os << basic_block->identifier();
424     return os;
425 }
426 
427 } /* namespace gr */
428 
429 #endif /* INCLUDED_GR_BASIC_BLOCK_H */
430