1 /* -*- c++ -*- */ 2 /* 3 * Copyright 2006,2008,2009,2011,2013 Free Software Foundation, Inc. 4 * 5 * This file is part of GNU Radio 6 * 7 * GNU Radio is free software; you can redistribute it and/or modify 8 * it under the terms of the GNU General Public License as published by 9 * the Free Software Foundation; either version 3, or (at your option) 10 * any later version. 11 * 12 * GNU Radio is distributed in the hope that it will be useful, 13 * but WITHOUT ANY WARRANTY; without even the implied warranty of 14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15 * GNU General Public License for more details. 16 * 17 * You should have received a copy of the GNU General Public License 18 * along with GNU Radio; see the file COPYING. If not, write to 19 * the Free Software Foundation, Inc., 51 Franklin Street, 20 * Boston, MA 02110-1301, USA. 21 */ 22 23 #ifndef INCLUDED_GR_BASIC_BLOCK_H 24 #define INCLUDED_GR_BASIC_BLOCK_H 25 26 #include <gnuradio/api.h> 27 #include <gnuradio/io_signature.h> 28 #include <gnuradio/msg_accepter.h> 29 #include <gnuradio/runtime_types.h> 30 #include <gnuradio/sptr_magic.h> 31 #include <gnuradio/thread/thread.h> 32 #include <boost/enable_shared_from_this.hpp> 33 #include <boost/foreach.hpp> 34 #include <boost/function.hpp> 35 #include <boost/thread/condition_variable.hpp> 36 #if (BOOST_VERSION >= 106000) 37 #include <boost/bind/bind.hpp> 38 #endif 39 #include <deque> 40 #include <iostream> 41 #include <map> 42 #include <string> 43 44 #include <gnuradio/rpcregisterhelpers.h> 45 46 namespace gr { 47 #if (BOOST_VERSION >= 106000) 48 using namespace boost::placeholders; 49 #endif 50 /*! 51 * \brief The abstract base class for all signal processing blocks. 52 * \ingroup internal 53 * 54 * Basic blocks are the bare abstraction of an entity that has a 55 * name, a set of inputs and outputs, and a message queue. These 56 * are never instantiated directly; rather, this is the abstract 57 * parent class of both gr_hier_block, which is a recursive 58 * container, and block, which implements actual signal 59 * processing functions. 60 */ 61 class GR_RUNTIME_API basic_block : public msg_accepter, 62 public boost::enable_shared_from_this<basic_block> 63 { 64 typedef boost::function<void(pmt::pmt_t)> msg_handler_t; 65 66 private: 67 typedef std::map<pmt::pmt_t, msg_handler_t, pmt::comparator> d_msg_handlers_t; 68 d_msg_handlers_t d_msg_handlers; 69 70 typedef std::deque<pmt::pmt_t> msg_queue_t; 71 typedef std::map<pmt::pmt_t, msg_queue_t, pmt::comparator> msg_queue_map_t; 72 typedef std::map<pmt::pmt_t, msg_queue_t, pmt::comparator>::iterator 73 msg_queue_map_itr; 74 std::map<pmt::pmt_t, boost::shared_ptr<boost::condition_variable>, pmt::comparator> 75 msg_queue_ready; 76 77 gr::thread::mutex mutex; //< protects all vars 78 79 protected: 80 friend class flowgraph; 81 friend class flat_flowgraph; // TODO: will be redundant 82 friend class tpb_thread_body; 83 84 enum vcolor { WHITE, GREY, BLACK }; 85 86 std::string d_name; 87 gr::io_signature::sptr d_input_signature; 88 gr::io_signature::sptr d_output_signature; 89 long d_unique_id; 90 long d_symbolic_id; 91 std::string d_symbol_name; 92 std::string d_symbol_alias; 93 vcolor d_color; 94 bool d_rpc_set; 95 96 msg_queue_map_t msg_queue; 97 std::vector<rpcbasic_sptr> d_rpc_vars; // container for all RPC variables 98 basic_block(void)99 basic_block(void) {} // allows pure virtual interface sub-classes 100 101 //! Protected constructor prevents instantiation by non-derived classes 102 basic_block(const std::string& name, 103 gr::io_signature::sptr input_signature, 104 gr::io_signature::sptr output_signature); 105 106 //! may only be called during constructor set_input_signature(gr::io_signature::sptr iosig)107 void set_input_signature(gr::io_signature::sptr iosig) { d_input_signature = iosig; } 108 109 //! may only be called during constructor set_output_signature(gr::io_signature::sptr iosig)110 void set_output_signature(gr::io_signature::sptr iosig) 111 { 112 d_output_signature = iosig; 113 } 114 115 /*! 116 * \brief Allow the flowgraph to set for sorting and partitioning 117 */ set_color(vcolor color)118 void set_color(vcolor color) { d_color = color; } color()119 vcolor color() const { return d_color; } 120 121 /*! 122 * \brief Tests if there is a handler attached to port \p which_port 123 */ has_msg_handler(pmt::pmt_t which_port)124 virtual bool has_msg_handler(pmt::pmt_t which_port) 125 { 126 return (d_msg_handlers.find(which_port) != d_msg_handlers.end()); 127 } 128 129 /* 130 * This function is called by the runtime system to dispatch messages. 131 * 132 * The thread-safety guarantees mentioned in set_msg_handler are 133 * implemented by the callers of this method. 134 */ dispatch_msg(pmt::pmt_t which_port,pmt::pmt_t msg)135 virtual void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg) 136 { 137 // AA Update this 138 if (has_msg_handler(which_port)) { // Is there a handler? 139 d_msg_handlers[which_port](msg); // Yes, invoke it. 140 } 141 } 142 143 // Message passing interface 144 pmt::pmt_t d_message_subscribers; 145 146 public: 147 pmt::pmt_t message_subscribers(pmt::pmt_t port); 148 virtual ~basic_block(); unique_id()149 long unique_id() const { return d_unique_id; } symbolic_id()150 long symbolic_id() const { return d_symbolic_id; } 151 152 /*! The name of the block */ name()153 std::string name() const { return d_name; } 154 155 /*! 156 * The sybolic name of the block, which is used in the 157 * block_registry. The name is assigned by the block's constructor 158 * and never changes during the life of the block. 159 */ symbol_name()160 std::string symbol_name() const { return d_symbol_name; } identifier()161 std::string identifier() const 162 { 163 return this->name() + "(" + std::to_string(this->unique_id()) + ")"; 164 } 165 input_signature()166 gr::io_signature::sptr input_signature() const { return d_input_signature; } output_signature()167 gr::io_signature::sptr output_signature() const { return d_output_signature; } 168 basic_block_sptr to_basic_block(); // Needed for Python type coercion 169 170 /*! 171 * True if the block has an alias (see set_block_alias). 172 */ alias_set()173 bool alias_set() const { return !d_symbol_alias.empty(); } 174 175 /*! 176 * Returns the block's alias as a string. 177 */ alias()178 std::string alias() const { return alias_set() ? d_symbol_alias : symbol_name(); } 179 180 /*! 181 * Returns the block's alias as PMT. 182 */ alias_pmt()183 pmt::pmt_t alias_pmt() const { return pmt::intern(alias()); } 184 185 /*! 186 * Set's a new alias for the block; also adds an entry into the 187 * block_registry to get the block using either the alias or the 188 * original symbol name. 189 */ 190 void set_block_alias(std::string name); 191 192 // ** Message passing interface ** 193 void message_port_register_in(pmt::pmt_t port_id); 194 void message_port_register_out(pmt::pmt_t port_id); 195 void message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg); 196 void message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target); 197 void message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target); 198 message_port_is_hier(pmt::pmt_t port_id)199 virtual bool message_port_is_hier(pmt::pmt_t port_id) 200 { 201 (void)port_id; 202 return false; 203 } message_port_is_hier_in(pmt::pmt_t port_id)204 virtual bool message_port_is_hier_in(pmt::pmt_t port_id) 205 { 206 (void)port_id; 207 return false; 208 } message_port_is_hier_out(pmt::pmt_t port_id)209 virtual bool message_port_is_hier_out(pmt::pmt_t port_id) 210 { 211 (void)port_id; 212 return false; 213 } 214 215 /*! 216 * \brief Get input message port names. 217 * 218 * Returns the available input message ports for a block. The 219 * return object is a PMT vector that is filled with PMT symbols. 220 */ 221 pmt::pmt_t message_ports_in(); 222 223 /*! 224 * \brief Get output message port names. 225 * 226 * Returns the available output message ports for a block. The 227 * return object is a PMT vector that is filled with PMT symbols. 228 */ 229 pmt::pmt_t message_ports_out(); 230 231 /*! 232 * Accept msg, place in queue, arrange for thread to be awakened if it's not already. 233 */ 234 void _post(pmt::pmt_t which_port, pmt::pmt_t msg); 235 236 //! is the queue empty? empty_p(pmt::pmt_t which_port)237 bool empty_p(pmt::pmt_t which_port) 238 { 239 if (msg_queue.find(which_port) == msg_queue.end()) 240 throw std::runtime_error("port does not exist!"); 241 return msg_queue[which_port].empty(); 242 } empty_p()243 bool empty_p() 244 { 245 bool rv = true; 246 BOOST_FOREACH (msg_queue_map_t::value_type& i, msg_queue) { 247 rv &= msg_queue[i.first].empty(); 248 } 249 return rv; 250 } 251 252 //! are all msg ports with handlers empty? empty_handled_p(pmt::pmt_t which_port)253 bool empty_handled_p(pmt::pmt_t which_port) 254 { 255 return (empty_p(which_port) || !has_msg_handler(which_port)); 256 } empty_handled_p()257 bool empty_handled_p() 258 { 259 bool rv = true; 260 BOOST_FOREACH (msg_queue_map_t::value_type& i, msg_queue) { 261 rv &= empty_handled_p(i.first); 262 } 263 return rv; 264 } 265 266 //! How many messages in the queue? nmsgs(pmt::pmt_t which_port)267 size_t nmsgs(pmt::pmt_t which_port) 268 { 269 if (msg_queue.find(which_port) == msg_queue.end()) 270 throw std::runtime_error("port does not exist!"); 271 return msg_queue[which_port].size(); 272 } 273 274 //| Acquires and release the mutex 275 void insert_tail(pmt::pmt_t which_port, pmt::pmt_t msg); 276 /*! 277 * \returns returns pmt at head of queue or pmt::pmt_t() if empty. 278 */ 279 pmt::pmt_t delete_head_nowait(pmt::pmt_t which_port); 280 get_iterator(pmt::pmt_t which_port)281 msg_queue_t::iterator get_iterator(pmt::pmt_t which_port) 282 { 283 return msg_queue[which_port].begin(); 284 } 285 erase_msg(pmt::pmt_t which_port,msg_queue_t::iterator it)286 void erase_msg(pmt::pmt_t which_port, msg_queue_t::iterator it) 287 { 288 msg_queue[which_port].erase(it); 289 } 290 has_msg_port(pmt::pmt_t which_port)291 virtual bool has_msg_port(pmt::pmt_t which_port) 292 { 293 if (msg_queue.find(which_port) != msg_queue.end()) { 294 return true; 295 } 296 if (pmt::dict_has_key(d_message_subscribers, which_port)) { 297 return true; 298 } 299 return false; 300 } 301 get_msg_map(void)302 const msg_queue_map_t& get_msg_map(void) const { return msg_queue; } 303 304 #ifdef GR_CTRLPORT 305 /*! 306 * \brief Add an RPC variable (get or set). 307 * 308 * Using controlport, we create new getters/setters and need to 309 * store them. Each block has a vector to do this, and these never 310 * need to be accessed again once they are registered with the RPC 311 * backend. This function takes a 312 * boost::shared_sptr<rpcbasic_base> so that when the block is 313 * deleted, all RPC registered variables are cleaned up. 314 * 315 * \param s an rpcbasic_sptr of the new RPC variable register to store. 316 */ add_rpc_variable(rpcbasic_sptr s)317 void add_rpc_variable(rpcbasic_sptr s) { d_rpc_vars.push_back(s); } 318 #endif /* GR_CTRLPORT */ 319 320 /*! 321 * \brief Set up the RPC registered variables. 322 * 323 * This must be overloaded by a block that wants to use 324 * controlport. This is where rpcbasic_register_{get,set} pointers 325 * are created, which then get wrapped as shared pointers 326 * (rpcbasic_sptr(...)) and stored using add_rpc_variable. 327 */ setup_rpc()328 virtual void setup_rpc(){}; 329 330 /*! 331 * \brief Ask if this block has been registered to the RPC. 332 * 333 * We can only register a block once, so we use this to protect us 334 * from calling it multiple times. 335 */ is_rpc_set()336 bool is_rpc_set() { return d_rpc_set; } 337 338 /*! 339 * \brief When the block is registered with the RPC, set this. 340 */ rpc_set()341 void rpc_set() { d_rpc_set = true; } 342 343 /*! 344 * \brief Confirm that ninputs and noutputs is an acceptable combination. 345 * 346 * \param ninputs number of input streams connected 347 * \param noutputs number of output streams connected 348 * 349 * \returns true if this is a valid configuration for this block. 350 * 351 * This function is called by the runtime system whenever the 352 * topology changes. Most classes do not need to override this. 353 * This check is in addition to the constraints specified by the 354 * input and output gr::io_signatures. 355 */ check_topology(int ninputs,int noutputs)356 virtual bool check_topology(int ninputs, int noutputs) 357 { 358 (void)ninputs; 359 (void)noutputs; 360 return true; 361 } 362 363 /*! 364 * \brief Set the callback that is fired when messages are available. 365 * 366 * \p msg_handler can be any kind of function pointer or function object 367 * that has the signature: 368 * <pre> 369 * void msg_handler(pmt::pmt msg); 370 * </pre> 371 * 372 * (You may want to use boost::bind to massage your callable into 373 * the correct form. See gr::blocks::nop for an example that sets 374 * up a class method as the callback.) 375 * 376 * Blocks that desire to handle messages must call this method in 377 * their constructors to register the handler that will be invoked 378 * when messages are available. 379 * 380 * If the block inherits from block, the runtime system will 381 * ensure that msg_handler is called in a thread-safe manner, such 382 * that work and msg_handler will never be called concurrently. 383 * This allows msg_handler to update state variables without 384 * having to worry about thread-safety issues with work, 385 * general_work or another invocation of msg_handler. 386 * 387 * If the block inherits from hier_block2, the runtime system 388 * will ensure that no reentrant calls are made to msg_handler. 389 */ 390 template <typename T> set_msg_handler(pmt::pmt_t which_port,T msg_handler)391 void set_msg_handler(pmt::pmt_t which_port, T msg_handler) 392 { 393 if (msg_queue.find(which_port) == msg_queue.end()) { 394 throw std::runtime_error( 395 "attempt to set_msg_handler() on bad input message port!"); 396 } 397 d_msg_handlers[which_port] = msg_handler_t(msg_handler); 398 } 399 400 virtual void set_processor_affinity(const std::vector<int>& mask) = 0; 401 402 virtual void unset_processor_affinity() = 0; 403 404 virtual std::vector<int> processor_affinity() = 0; 405 406 virtual void set_log_level(std::string level) = 0; 407 408 virtual std::string log_level() = 0; 409 }; 410 411 inline bool operator<(basic_block_sptr lhs, basic_block_sptr rhs) 412 { 413 return lhs->unique_id() < rhs->unique_id(); 414 } 415 416 typedef std::vector<basic_block_sptr> basic_block_vector_t; 417 typedef std::vector<basic_block_sptr>::iterator basic_block_viter_t; 418 419 GR_RUNTIME_API long basic_block_ncurrently_allocated(); 420 421 inline std::ostream& operator<<(std::ostream& os, basic_block_sptr basic_block) 422 { 423 os << basic_block->identifier(); 424 return os; 425 } 426 427 } /* namespace gr */ 428 429 #endif /* INCLUDED_GR_BASIC_BLOCK_H */ 430