1 /* -*- c++ -*- */
2 /*
3 * Copyright 2006,2012-2013 Free Software Foundation, Inc.
4 *
5 * This file is part of GNU Radio
6 *
7 * GNU Radio is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 3, or (at your option)
10 * any later version.
11 *
12 * GNU Radio is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with GNU Radio; see the file COPYING. If not, write to
19 * the Free Software Foundation, Inc., 51 Franklin Street,
20 * Boston, MA 02110-1301, USA.
21 */
22
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26
27 #include <gnuradio/basic_block.h>
28 #include <gnuradio/block_registry.h>
29 #include <gnuradio/logger.h>
30 #include <iostream>
31 #include <sstream>
32 #include <stdexcept>
33
34 namespace gr {
35
36 static long s_next_id = 0;
37 static long s_ncurrently_allocated = 0;
38
basic_block_ncurrently_allocated()39 long basic_block_ncurrently_allocated() { return s_ncurrently_allocated; }
40
basic_block(const std::string & name,io_signature::sptr input_signature,io_signature::sptr output_signature)41 basic_block::basic_block(const std::string& name,
42 io_signature::sptr input_signature,
43 io_signature::sptr output_signature)
44 : d_name(name),
45 d_input_signature(input_signature),
46 d_output_signature(output_signature),
47 d_unique_id(s_next_id++),
48 d_symbolic_id(global_block_registry.block_register(this)),
49 d_symbol_name(global_block_registry.register_symbolic_name(this)),
50 d_color(WHITE),
51 d_rpc_set(false),
52 d_message_subscribers(pmt::make_dict())
53 {
54 s_ncurrently_allocated++;
55 }
56
~basic_block()57 basic_block::~basic_block()
58 {
59 s_ncurrently_allocated--;
60 global_block_registry.block_unregister(this);
61 }
62
to_basic_block()63 basic_block_sptr basic_block::to_basic_block() { return shared_from_this(); }
64
set_block_alias(std::string name)65 void basic_block::set_block_alias(std::string name)
66 {
67 // Only keep one alias'd name around for each block. If we don't
68 // have an alias, add it; if we do, update the entry in the
69 // registry.
70 if (alias_set())
71 global_block_registry.update_symbolic_name(this, name);
72 else
73 global_block_registry.register_symbolic_name(this, name);
74
75 // set the block's alias
76 d_symbol_alias = name;
77 update_logger_alias(symbol_name(), d_symbol_alias);
78 }
79
80 // ** Message passing interface **
81
82 // - register a new input message port
message_port_register_in(pmt::pmt_t port_id)83 void basic_block::message_port_register_in(pmt::pmt_t port_id)
84 {
85 if (!pmt::is_symbol(port_id)) {
86 throw std::runtime_error("message_port_register_in: bad port id");
87 }
88 msg_queue[port_id] = msg_queue_t();
89 msg_queue_ready[port_id] =
90 boost::shared_ptr<boost::condition_variable>(new boost::condition_variable());
91 }
92
message_ports_in()93 pmt::pmt_t basic_block::message_ports_in()
94 {
95 pmt::pmt_t port_names = pmt::make_vector(msg_queue.size(), pmt::PMT_NIL);
96 msg_queue_map_itr itr = msg_queue.begin();
97 for (size_t i = 0; i < msg_queue.size(); i++) {
98 pmt::vector_set(port_names, i, (*itr).first);
99 itr++;
100 }
101 return port_names;
102 }
103
104 // - register a new output message port
message_port_register_out(pmt::pmt_t port_id)105 void basic_block::message_port_register_out(pmt::pmt_t port_id)
106 {
107 if (!pmt::is_symbol(port_id)) {
108 throw std::runtime_error("message_port_register_out: bad port id");
109 }
110 if (pmt::dict_has_key(d_message_subscribers, port_id)) {
111 throw std::runtime_error("message_port_register_out: port already in use");
112 }
113 d_message_subscribers = pmt::dict_add(d_message_subscribers, port_id, pmt::PMT_NIL);
114 }
115
message_ports_out()116 pmt::pmt_t basic_block::message_ports_out()
117 {
118 size_t len = pmt::length(d_message_subscribers);
119 pmt::pmt_t port_names = pmt::make_vector(len, pmt::PMT_NIL);
120 pmt::pmt_t keys = pmt::dict_keys(d_message_subscribers);
121 for (size_t i = 0; i < len; i++) {
122 pmt::vector_set(port_names, i, pmt::nth(i, keys));
123 }
124 return port_names;
125 }
126
127 // - publish a message on a message port
message_port_pub(pmt::pmt_t port_id,pmt::pmt_t msg)128 void basic_block::message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg)
129 {
130 if (!pmt::dict_has_key(d_message_subscribers, port_id)) {
131 throw std::runtime_error("port does not exist");
132 }
133
134 pmt::pmt_t currlist = pmt::dict_ref(d_message_subscribers, port_id, pmt::PMT_NIL);
135 // iterate through subscribers on port
136 while (pmt::is_pair(currlist)) {
137 pmt::pmt_t target = pmt::car(currlist);
138
139 pmt::pmt_t block = pmt::car(target);
140 pmt::pmt_t port = pmt::cdr(target);
141
142 currlist = pmt::cdr(currlist);
143 basic_block_sptr blk = global_block_registry.block_lookup(block);
144 // blk->post(msg);
145 blk->post(port, msg);
146 }
147 }
148
149 // - subscribe to a message port
message_port_sub(pmt::pmt_t port_id,pmt::pmt_t target)150 void basic_block::message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target)
151 {
152 if (!pmt::dict_has_key(d_message_subscribers, port_id)) {
153 std::stringstream ss;
154 ss << "Port does not exist: \"" << pmt::write_string(port_id)
155 << "\" on block: " << pmt::write_string(target) << std::endl;
156 throw std::runtime_error(ss.str());
157 }
158 pmt::pmt_t currlist = pmt::dict_ref(d_message_subscribers, port_id, pmt::PMT_NIL);
159
160 // ignore re-adds of the same target
161 if (!pmt::list_has(currlist, target))
162 d_message_subscribers = pmt::dict_add(
163 d_message_subscribers, port_id, pmt::list_add(currlist, target));
164 }
165
message_port_unsub(pmt::pmt_t port_id,pmt::pmt_t target)166 void basic_block::message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target)
167 {
168 if (!pmt::dict_has_key(d_message_subscribers, port_id)) {
169 std::stringstream ss;
170 ss << "Port does not exist: \"" << pmt::write_string(port_id)
171 << "\" on block: " << pmt::write_string(target) << std::endl;
172 throw std::runtime_error(ss.str());
173 }
174
175 // ignore unsubs of unknown targets
176 pmt::pmt_t currlist = pmt::dict_ref(d_message_subscribers, port_id, pmt::PMT_NIL);
177 d_message_subscribers =
178 pmt::dict_add(d_message_subscribers, port_id, pmt::list_rm(currlist, target));
179 }
180
_post(pmt::pmt_t which_port,pmt::pmt_t msg)181 void basic_block::_post(pmt::pmt_t which_port, pmt::pmt_t msg)
182 {
183 insert_tail(which_port, msg);
184 }
185
insert_tail(pmt::pmt_t which_port,pmt::pmt_t msg)186 void basic_block::insert_tail(pmt::pmt_t which_port, pmt::pmt_t msg)
187 {
188 gr::thread::scoped_lock guard(mutex);
189
190 if ((msg_queue.find(which_port) == msg_queue.end()) ||
191 (msg_queue_ready.find(which_port) == msg_queue_ready.end())) {
192 std::cout << "target port = " << pmt::symbol_to_string(which_port) << std::endl;
193 throw std::runtime_error("attempted to insert_tail on invalid queue!");
194 }
195
196 msg_queue[which_port].push_back(msg);
197 msg_queue_ready[which_port]->notify_one();
198
199 // wake up thread if BLKD_IN or BLKD_OUT
200 global_block_registry.notify_blk(d_symbol_name);
201 }
202
delete_head_nowait(pmt::pmt_t which_port)203 pmt::pmt_t basic_block::delete_head_nowait(pmt::pmt_t which_port)
204 {
205 gr::thread::scoped_lock guard(mutex);
206
207 if (empty_p(which_port)) {
208 return pmt::pmt_t();
209 }
210
211 pmt::pmt_t m(msg_queue[which_port].front());
212 msg_queue[which_port].pop_front();
213
214 return m;
215 }
216
message_subscribers(pmt::pmt_t port)217 pmt::pmt_t basic_block::message_subscribers(pmt::pmt_t port)
218 {
219 return pmt::dict_ref(d_message_subscribers, port, pmt::PMT_NIL);
220 }
221
222
223 } /* namespace gr */
224