1 /* -*- c++ -*- */
2 /*
3  * Copyright 2006,2012-2013 Free Software Foundation, Inc.
4  *
5  * This file is part of GNU Radio
6  *
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3, or (at your option)
10  * any later version.
11  *
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with GNU Radio; see the file COPYING.  If not, write to
19  * the Free Software Foundation, Inc., 51 Franklin Street,
20  * Boston, MA 02110-1301, USA.
21  */
22 
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26 
27 #include <gnuradio/basic_block.h>
28 #include <gnuradio/block_registry.h>
29 #include <gnuradio/logger.h>
30 #include <iostream>
31 #include <sstream>
32 #include <stdexcept>
33 
34 namespace gr {
35 
36 static long s_next_id = 0;
37 static long s_ncurrently_allocated = 0;
38 
basic_block_ncurrently_allocated()39 long basic_block_ncurrently_allocated() { return s_ncurrently_allocated; }
40 
basic_block(const std::string & name,io_signature::sptr input_signature,io_signature::sptr output_signature)41 basic_block::basic_block(const std::string& name,
42                          io_signature::sptr input_signature,
43                          io_signature::sptr output_signature)
44     : d_name(name),
45       d_input_signature(input_signature),
46       d_output_signature(output_signature),
47       d_unique_id(s_next_id++),
48       d_symbolic_id(global_block_registry.block_register(this)),
49       d_symbol_name(global_block_registry.register_symbolic_name(this)),
50       d_color(WHITE),
51       d_rpc_set(false),
52       d_message_subscribers(pmt::make_dict())
53 {
54     s_ncurrently_allocated++;
55 }
56 
~basic_block()57 basic_block::~basic_block()
58 {
59     s_ncurrently_allocated--;
60     global_block_registry.block_unregister(this);
61 }
62 
to_basic_block()63 basic_block_sptr basic_block::to_basic_block() { return shared_from_this(); }
64 
set_block_alias(std::string name)65 void basic_block::set_block_alias(std::string name)
66 {
67     // Only keep one alias'd name around for each block. If we don't
68     // have an alias, add it; if we do, update the entry in the
69     // registry.
70     if (alias_set())
71         global_block_registry.update_symbolic_name(this, name);
72     else
73         global_block_registry.register_symbolic_name(this, name);
74 
75     // set the block's alias
76     d_symbol_alias = name;
77     update_logger_alias(symbol_name(), d_symbol_alias);
78 }
79 
80 // ** Message passing interface **
81 
82 //  - register a new input message port
message_port_register_in(pmt::pmt_t port_id)83 void basic_block::message_port_register_in(pmt::pmt_t port_id)
84 {
85     if (!pmt::is_symbol(port_id)) {
86         throw std::runtime_error("message_port_register_in: bad port id");
87     }
88     msg_queue[port_id] = msg_queue_t();
89     msg_queue_ready[port_id] =
90         boost::shared_ptr<boost::condition_variable>(new boost::condition_variable());
91 }
92 
message_ports_in()93 pmt::pmt_t basic_block::message_ports_in()
94 {
95     pmt::pmt_t port_names = pmt::make_vector(msg_queue.size(), pmt::PMT_NIL);
96     msg_queue_map_itr itr = msg_queue.begin();
97     for (size_t i = 0; i < msg_queue.size(); i++) {
98         pmt::vector_set(port_names, i, (*itr).first);
99         itr++;
100     }
101     return port_names;
102 }
103 
104 //  - register a new output message port
message_port_register_out(pmt::pmt_t port_id)105 void basic_block::message_port_register_out(pmt::pmt_t port_id)
106 {
107     if (!pmt::is_symbol(port_id)) {
108         throw std::runtime_error("message_port_register_out: bad port id");
109     }
110     if (pmt::dict_has_key(d_message_subscribers, port_id)) {
111         throw std::runtime_error("message_port_register_out: port already in use");
112     }
113     d_message_subscribers = pmt::dict_add(d_message_subscribers, port_id, pmt::PMT_NIL);
114 }
115 
message_ports_out()116 pmt::pmt_t basic_block::message_ports_out()
117 {
118     size_t len = pmt::length(d_message_subscribers);
119     pmt::pmt_t port_names = pmt::make_vector(len, pmt::PMT_NIL);
120     pmt::pmt_t keys = pmt::dict_keys(d_message_subscribers);
121     for (size_t i = 0; i < len; i++) {
122         pmt::vector_set(port_names, i, pmt::nth(i, keys));
123     }
124     return port_names;
125 }
126 
127 //  - publish a message on a message port
message_port_pub(pmt::pmt_t port_id,pmt::pmt_t msg)128 void basic_block::message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg)
129 {
130     if (!pmt::dict_has_key(d_message_subscribers, port_id)) {
131         throw std::runtime_error("port does not exist");
132     }
133 
134     pmt::pmt_t currlist = pmt::dict_ref(d_message_subscribers, port_id, pmt::PMT_NIL);
135     // iterate through subscribers on port
136     while (pmt::is_pair(currlist)) {
137         pmt::pmt_t target = pmt::car(currlist);
138 
139         pmt::pmt_t block = pmt::car(target);
140         pmt::pmt_t port = pmt::cdr(target);
141 
142         currlist = pmt::cdr(currlist);
143         basic_block_sptr blk = global_block_registry.block_lookup(block);
144         // blk->post(msg);
145         blk->post(port, msg);
146     }
147 }
148 
149 //  - subscribe to a message port
message_port_sub(pmt::pmt_t port_id,pmt::pmt_t target)150 void basic_block::message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target)
151 {
152     if (!pmt::dict_has_key(d_message_subscribers, port_id)) {
153         std::stringstream ss;
154         ss << "Port does not exist: \"" << pmt::write_string(port_id)
155            << "\" on block: " << pmt::write_string(target) << std::endl;
156         throw std::runtime_error(ss.str());
157     }
158     pmt::pmt_t currlist = pmt::dict_ref(d_message_subscribers, port_id, pmt::PMT_NIL);
159 
160     // ignore re-adds of the same target
161     if (!pmt::list_has(currlist, target))
162         d_message_subscribers = pmt::dict_add(
163             d_message_subscribers, port_id, pmt::list_add(currlist, target));
164 }
165 
message_port_unsub(pmt::pmt_t port_id,pmt::pmt_t target)166 void basic_block::message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target)
167 {
168     if (!pmt::dict_has_key(d_message_subscribers, port_id)) {
169         std::stringstream ss;
170         ss << "Port does not exist: \"" << pmt::write_string(port_id)
171            << "\" on block: " << pmt::write_string(target) << std::endl;
172         throw std::runtime_error(ss.str());
173     }
174 
175     // ignore unsubs of unknown targets
176     pmt::pmt_t currlist = pmt::dict_ref(d_message_subscribers, port_id, pmt::PMT_NIL);
177     d_message_subscribers =
178         pmt::dict_add(d_message_subscribers, port_id, pmt::list_rm(currlist, target));
179 }
180 
_post(pmt::pmt_t which_port,pmt::pmt_t msg)181 void basic_block::_post(pmt::pmt_t which_port, pmt::pmt_t msg)
182 {
183     insert_tail(which_port, msg);
184 }
185 
insert_tail(pmt::pmt_t which_port,pmt::pmt_t msg)186 void basic_block::insert_tail(pmt::pmt_t which_port, pmt::pmt_t msg)
187 {
188     gr::thread::scoped_lock guard(mutex);
189 
190     if ((msg_queue.find(which_port) == msg_queue.end()) ||
191         (msg_queue_ready.find(which_port) == msg_queue_ready.end())) {
192         std::cout << "target port = " << pmt::symbol_to_string(which_port) << std::endl;
193         throw std::runtime_error("attempted to insert_tail on invalid queue!");
194     }
195 
196     msg_queue[which_port].push_back(msg);
197     msg_queue_ready[which_port]->notify_one();
198 
199     // wake up thread if BLKD_IN or BLKD_OUT
200     global_block_registry.notify_blk(d_symbol_name);
201 }
202 
delete_head_nowait(pmt::pmt_t which_port)203 pmt::pmt_t basic_block::delete_head_nowait(pmt::pmt_t which_port)
204 {
205     gr::thread::scoped_lock guard(mutex);
206 
207     if (empty_p(which_port)) {
208         return pmt::pmt_t();
209     }
210 
211     pmt::pmt_t m(msg_queue[which_port].front());
212     msg_queue[which_port].pop_front();
213 
214     return m;
215 }
216 
message_subscribers(pmt::pmt_t port)217 pmt::pmt_t basic_block::message_subscribers(pmt::pmt_t port)
218 {
219     return pmt::dict_ref(d_message_subscribers, port, pmt::PMT_NIL);
220 }
221 
222 
223 } /* namespace gr */
224