1 //
2 // Copyright 2019 Ettus Research, a National Instruments Brand
3 //
4 // SPDX-License-Identifier: GPL-3.0-or-later
5 //
6 
7 #include <uhd/exception.hpp>
8 #include <uhd/rfnoc/node.hpp>
9 #include <uhd/utils/log.hpp>
10 #include <uhdlib/rfnoc/prop_accessor.hpp>
11 #include <boost/format.hpp>
12 #include <algorithm>
13 #include <iostream>
14 
15 using namespace uhd::rfnoc;
16 
17 dirtifier_t node_t::ALWAYS_DIRTY{};
18 
19 
node_t()20 node_t::node_t()
21 {
22     register_property(&ALWAYS_DIRTY);
23 }
24 
get_unique_id() const25 std::string node_t::get_unique_id() const
26 {
27     // TODO return something better
28     return str(boost::format("%08X") % this);
29 }
30 
get_property_ids() const31 std::vector<std::string> node_t::get_property_ids() const
32 {
33     std::lock_guard<std::mutex> _l(_prop_mutex);
34     if (_props.count(res_source_info::USER) == 0) {
35         return {};
36     }
37 
38     auto& user_props = _props.at(res_source_info::USER);
39     // TODO use a range here, we're not savages
40     std::vector<std::string> return_value(user_props.size());
41     for (size_t i = 0; i < user_props.size(); ++i) {
42         return_value[i] = user_props[i]->get_id();
43     }
44 
45     return return_value;
46 }
47 
set_properties(const uhd::device_addr_t & props,const size_t instance)48 void node_t::set_properties(const uhd::device_addr_t& props, const size_t instance)
49 {
50     for (const auto& key : props.keys()) {
51         std::string local_key  = key;
52         size_t local_instance  = instance;
53         const size_t colon_pos = key.find(':');
54         if (colon_pos != std::string::npos) {
55             // Extract the property ID and instance
56             local_key                 = key.substr(0, colon_pos);
57             std::string instance_part = key.substr(colon_pos + 1);
58             try {
59                 local_instance = std::stoi(instance_part);
60             } catch (...) {
61                 // If no number, or an invalid number is specified after the
62                 // colon, throw a value_error.
63                 throw uhd::value_error("Property id `" + local_key
64                                        + "' contains a malformed instance override!");
65             }
66         }
67 
68         property_base_t* prop_ref =
69             _find_property({res_source_info::USER, local_instance}, local_key);
70         if (!prop_ref) {
71             RFNOC_LOG_WARNING("set_properties() cannot set property `"
72                               << local_key << "': No such property.");
73             continue;
74         }
75         auto prop_access = _request_property_access(prop_ref, property_base_t::RW);
76         prop_ref->set_from_str(props.get(key));
77     }
78 
79     // Now trigger a property resolution. If other properties depend on modified
80     // properties, they will be updated.
81     resolve_all();
82 }
83 
set_command_time(uhd::time_spec_t time,const size_t instance)84 void node_t::set_command_time(uhd::time_spec_t time, const size_t instance)
85 {
86     if (_cmd_timespecs.size() <= instance) {
87         _cmd_timespecs.resize(instance + 1, uhd::time_spec_t(0.0));
88     }
89 
90     _cmd_timespecs[instance] = time;
91 }
92 
get_command_time(const size_t instance) const93 uhd::time_spec_t node_t::get_command_time(const size_t instance) const
94 {
95     if (instance >= _cmd_timespecs.size()) {
96         return uhd::time_spec_t::ASAP;
97     }
98 
99     return _cmd_timespecs.at(instance);
100 }
101 
clear_command_time(const size_t instance)102 void node_t::clear_command_time(const size_t instance)
103 {
104     set_command_time(uhd::time_spec_t(0.0), instance);
105 }
106 
107 /*** Protected methods *******************************************************/
register_property(property_base_t * prop,resolve_callback_t && clean_callback)108 void node_t::register_property(property_base_t* prop, resolve_callback_t&& clean_callback)
109 {
110     std::lock_guard<std::mutex> _l(_prop_mutex);
111     const auto src_type = prop->get_src_info().type;
112 
113     // If the map is empty for this source type, create an empty vector
114     if (_props.count(src_type) == 0) {
115         _props[src_type] = {};
116     }
117 
118     auto prop_already_registered = [prop](const property_base_t* existing_prop) {
119         return (prop == existing_prop)
120                || (prop->get_src_info() == existing_prop->get_src_info()
121                       && prop->get_id() == existing_prop->get_id());
122     };
123     if (!filter_props(prop_already_registered).empty()) {
124         throw uhd::runtime_error(std::string("Attempting to double-register property: ")
125                                  + prop->get_id() + "[" + prop->get_src_info().to_string()
126                                  + "]");
127     }
128 
129     _props[src_type].push_back(prop);
130     if (clean_callback) {
131         _clean_cb_registry[prop] = std::move(clean_callback);
132     }
133 
134     prop_accessor_t{}.set_access(prop, property_base_t::RW);
135 }
136 
add_property_resolver(prop_ptrs_t && inputs,prop_ptrs_t && outputs,resolver_fn_t && resolver_fn)137 void node_t::add_property_resolver(
138     prop_ptrs_t&& inputs, prop_ptrs_t&& outputs, resolver_fn_t&& resolver_fn)
139 {
140     std::lock_guard<std::mutex> _l(_prop_mutex);
141 
142     // Sanity check: All inputs and outputs must be registered properties
143     auto prop_is_registered = [this](property_base_t* prop) -> bool {
144         return bool(this->_find_property(prop->get_src_info(), prop->get_id()));
145     };
146     for (const auto& prop : inputs) {
147         if (!prop_is_registered(prop)) {
148             throw uhd::runtime_error(
149                 std::string("Cannot add property resolver, input property ")
150                 + prop->get_id() + " is not registered!");
151         }
152     }
153     for (const auto& prop : outputs) {
154         if (!prop_is_registered(prop)) {
155             throw uhd::runtime_error(
156                 std::string("Cannot add property resolver, output property ")
157                 + prop->get_id() + " is not registered!");
158         }
159     }
160 
161     // All good, we can store it
162     _prop_resolvers.push_back(std::make_tuple(std::forward<prop_ptrs_t>(inputs),
163         std::forward<prop_ptrs_t>(outputs),
164         std::forward<resolver_fn_t>(resolver_fn)));
165 }
166 
167 
set_prop_forwarding_policy(forwarding_policy_t policy,const std::string & prop_id)168 void node_t::set_prop_forwarding_policy(
169     forwarding_policy_t policy, const std::string& prop_id)
170 {
171     _prop_fwd_policies[prop_id] = policy;
172 }
173 
set_prop_forwarding_map(const forwarding_map_t & map)174 void node_t::set_prop_forwarding_map(const forwarding_map_t& map)
175 {
176     _prop_fwd_map = map;
177 }
178 
register_action_handler(const std::string & id,action_handler_t && handler)179 void node_t::register_action_handler(const std::string& id, action_handler_t&& handler)
180 {
181     if (_action_handlers.count(id)) {
182         _action_handlers.erase(id);
183     }
184     _action_handlers.emplace(id, std::move(handler));
185 }
186 
set_action_forwarding_policy(node_t::forwarding_policy_t policy,const std::string & action_key)187 void node_t::set_action_forwarding_policy(
188     node_t::forwarding_policy_t policy, const std::string& action_key)
189 {
190     _action_fwd_policies[action_key] = policy;
191 }
192 
set_action_forwarding_map(const forwarding_map_t & map)193 void node_t::set_action_forwarding_map(const forwarding_map_t& map)
194 {
195     _action_fwd_map = map;
196 }
197 
post_action(const res_source_info & edge_info,action_info::sptr action)198 void node_t::post_action(const res_source_info& edge_info, action_info::sptr action)
199 {
200     _post_action_cb(edge_info, action);
201 }
202 
check_topology(const std::vector<size_t> & connected_inputs,const std::vector<size_t> & connected_outputs)203 bool node_t::check_topology(const std::vector<size_t>& connected_inputs,
204     const std::vector<size_t>& connected_outputs)
205 {
206     for (size_t port : connected_inputs) {
207         if (port >= get_num_input_ports()) {
208             return false;
209         }
210     }
211     for (size_t port : connected_outputs) {
212         if (port >= get_num_output_ports()) {
213             return false;
214         }
215     }
216 
217     return true;
218 }
219 
220 /*** Private methods *********************************************************/
_find_property(res_source_info src_info,const std::string & id) const221 property_base_t* node_t::_find_property(
222     res_source_info src_info, const std::string& id) const
223 {
224     for (const auto& type_prop_pair : _props) {
225         if (type_prop_pair.first != src_info.type) {
226             continue;
227         }
228         for (const auto& prop : type_prop_pair.second) {
229             if (prop->get_id() == id && prop->get_src_info() == src_info) {
230                 return prop;
231             }
232         }
233     }
234 
235     return nullptr;
236 }
237 
_request_property_access(property_base_t * prop,property_base_t::access_t access) const238 uhd::utils::scope_exit::uptr node_t::_request_property_access(
239     property_base_t* prop, property_base_t::access_t access) const
240 {
241     return prop_accessor_t{}.get_scoped_prop_access(*prop, access);
242 }
243 
244 
inject_edge_property(property_base_t * blueprint,res_source_info new_src_info)245 property_base_t* node_t::inject_edge_property(
246     property_base_t* blueprint, res_source_info new_src_info)
247 {
248     // Check if a property already exists which matches the new property
249     // requirements. If so, we can return early:
250     auto new_prop = _find_property(new_src_info, blueprint->get_id());
251     if (new_prop) {
252         return new_prop;
253     }
254 
255     // We need to create a new property and stash it away:
256     new_prop = [&]() -> property_base_t* {
257         auto prop = blueprint->clone(new_src_info);
258         auto ptr  = prop.get();
259         _dynamic_props.emplace(std::move(prop));
260         return ptr;
261     }();
262     register_property(new_prop);
263 
264     // Collect some info on how to do the forwarding:
265     const auto fwd_policy = [&](const std::string& id) {
266         if (_prop_fwd_policies.count(id)) {
267             return _prop_fwd_policies.at(id);
268         }
269         return _prop_fwd_policies.at("");
270     }(new_prop->get_id());
271     const size_t port_idx = new_prop->get_src_info().instance;
272     const auto port_type  = new_prop->get_src_info().type;
273     UHD_ASSERT_THROW(port_type == res_source_info::INPUT_EDGE
274                      || port_type == res_source_info::OUTPUT_EDGE);
275 
276     // Now comes the hard part: Figure out which other properties need to be
277     // created, and which resolvers need to be instantiated
278     if (fwd_policy == forwarding_policy_t::ONE_TO_ONE) {
279         // Figure out if there's an opposite port
280         const auto opposite_port_type = res_source_info::invert_edge(port_type);
281         if (_has_port({opposite_port_type, port_idx})) {
282             // Make sure that the other side's property exists:
283             // This is a safe recursion, because we've already created and
284             // registered this property.
285             auto opposite_prop =
286                 inject_edge_property(new_prop, {opposite_port_type, port_idx});
287             // Now add a resolver that will always forward the value from this
288             // property to the other one.
289             add_property_resolver(
290                 {new_prop}, {opposite_prop}, [new_prop, opposite_prop]() {
291                     prop_accessor_t{}.forward<false>(new_prop, opposite_prop);
292                 });
293         }
294     }
295     if (fwd_policy == forwarding_policy_t::ONE_TO_FAN) {
296         const auto opposite_port_type = res_source_info::invert_edge(port_type);
297         const size_t num_ports        = opposite_port_type == res_source_info::INPUT_EDGE
298                                      ? get_num_input_ports()
299                                      : get_num_output_ports();
300         for (size_t i = 0; i < num_ports; i++) {
301             auto opposite_prop = inject_edge_property(new_prop, {opposite_port_type, i});
302             // Now add a resolver that will always forward the value from this
303             // property to the other one.
304             add_property_resolver(
305                 {new_prop}, {opposite_prop}, [new_prop, opposite_prop]() {
306                     prop_accessor_t{}.forward<false>(new_prop, opposite_prop);
307                 });
308         }
309     }
310     if (fwd_policy == forwarding_policy_t::ONE_TO_ALL
311         || fwd_policy == forwarding_policy_t::ONE_TO_ALL_IN) {
312         // Loop through all other ports, make sure those properties exist
313         for (size_t other_port_idx = 0; other_port_idx < get_num_input_ports();
314              other_port_idx++) {
315             if (port_type == res_source_info::INPUT_EDGE && other_port_idx == port_idx) {
316                 continue;
317             }
318             inject_edge_property(new_prop, {res_source_info::INPUT_EDGE, other_port_idx});
319         }
320         // Now add a dynamic resolver that will update all input properties.
321         // In order to keep this code simple, we bypass the write list and
322         // get access via the prop_accessor.
323         add_property_resolver({new_prop}, {/* empty */}, [this, new_prop, port_idx]() {
324             for (size_t other_port_idx = 0; other_port_idx < get_num_input_ports();
325                  other_port_idx++) {
326                 if (other_port_idx == port_idx) {
327                     continue;
328                 }
329                 auto prop = _find_property(
330                     {res_source_info::INPUT_EDGE, other_port_idx}, new_prop->get_id());
331                 if (prop) {
332                     prop_accessor_t{}.forward<false>(new_prop, prop);
333                 }
334             }
335         });
336     }
337     if (fwd_policy == forwarding_policy_t::ONE_TO_ALL
338         || fwd_policy == forwarding_policy_t::ONE_TO_ALL_OUT) {
339         // Loop through all other ports, make sure those properties exist
340         for (size_t other_port_idx = 0; other_port_idx < get_num_output_ports();
341              other_port_idx++) {
342             if (port_type == res_source_info::OUTPUT_EDGE && other_port_idx == port_idx) {
343                 continue;
344             }
345             inject_edge_property(
346                 new_prop, {res_source_info::OUTPUT_EDGE, other_port_idx});
347         }
348         // Now add a dynamic resolver that will update all input properties.
349         // In order to keep this code simple, we bypass the write list and
350         // get access via the prop_accessor.
351         add_property_resolver({new_prop}, {/* empty */}, [this, new_prop, port_idx]() {
352             for (size_t other_port_idx = 0; other_port_idx < get_num_input_ports();
353                  other_port_idx++) {
354                 if (other_port_idx == port_idx) {
355                     continue;
356                 }
357                 auto prop = _find_property(
358                     {res_source_info::OUTPUT_EDGE, other_port_idx}, new_prop->get_id());
359                 if (prop) {
360                     prop_accessor_t{}.forward<false>(new_prop, prop);
361                 }
362             }
363         });
364     }
365     if (fwd_policy == forwarding_policy_t::USE_MAP) {
366         const auto src_info   = new_prop->get_src_info();
367         const auto& map_entry = _prop_fwd_map.find(src_info);
368         if (map_entry != _prop_fwd_map.end()) {
369             for (const auto& dst : map_entry->second) {
370                 if (!_has_port(dst)) {
371                     throw uhd::rfnoc_error("Destination port " + dst.to_string()
372                                            + " in prop map does not exist");
373                 }
374                 auto next_prop = inject_edge_property(new_prop, dst);
375                 // Now add a resolver that will always forward the value from this
376                 // property to the other one.
377                 add_property_resolver({new_prop}, {next_prop}, [new_prop, next_prop]() {
378                     prop_accessor_t{}.forward<false>(new_prop, next_prop);
379                 });
380             }
381         } else {
382             RFNOC_LOG_TRACE("Dropping incoming prop on " << src_info.to_string()
383                                                          << " (no destinations in map)");
384         }
385     }
386 
387     return new_prop;
388 }
389 
390 
init_props()391 void node_t::init_props()
392 {
393     std::lock_guard<std::mutex> _l(_prop_mutex);
394 
395     prop_accessor_t prop_accessor{};
396 
397     for (auto& resolver_tuple : _prop_resolvers) {
398         // 1) Set all outputs to RWLOCKED
399         auto& outputs = std::get<1>(resolver_tuple);
400         for (auto& output : outputs) {
401             prop_accessor.set_access(output, property_base_t::RWLOCKED);
402         }
403 
404         // 2) Run the resolver
405         try {
406             std::get<2>(resolver_tuple)();
407         } catch (const uhd::resolve_error& ex) {
408             UHD_LOGGER_WARNING(get_unique_id())
409                 << "Failed to initialize node. Most likely cause: Inconsistent default "
410                    "values. Resolver threw this error: "
411                 << ex.what();
412             // throw uhd::runtime_error(std::string("Failed to initialize node ") +
413             // get_unique_id());
414         }
415 
416         // 3) Set outputs back to RO
417         for (auto& output : outputs) {
418             prop_accessor.set_access(output, property_base_t::RO);
419         }
420     }
421 
422     // 4) Mark properties as clean and read-only
423     clean_props();
424 }
425 
426 
resolve_props()427 void node_t::resolve_props()
428 {
429     prop_accessor_t prop_accessor{};
430     const prop_ptrs_t initial_dirty_props =
431         filter_props([](property_base_t* prop) { return prop->is_dirty(); });
432     std::list<property_base_t*> all_dirty_props(
433         initial_dirty_props.cbegin(), initial_dirty_props.cend());
434     prop_ptrs_t processed_props{};
435     prop_ptrs_t written_props{};
436     RFNOC_LOG_TRACE("Locally resolving " << all_dirty_props.size()
437                                          << " dirty properties plus dependencies.");
438 
439     // Loop through all dirty properties. The list can be amended during the
440     // loop execution.
441     for (auto it = all_dirty_props.begin(); it != all_dirty_props.end(); ++it) {
442         auto current_input_prop = *it;
443         if (processed_props.count(current_input_prop)) {
444             continue;
445         }
446         // Find all resolvers that take this dirty property as an input:
447         for (auto& resolver_tuple : _prop_resolvers) {
448             auto& inputs  = std::get<0>(resolver_tuple);
449             auto& outputs = std::get<1>(resolver_tuple);
450             if (!inputs.count(current_input_prop)) {
451                 continue;
452             }
453 
454             // Enable outputs
455             std::vector<uhd::utils::scope_exit::uptr> access_holder;
456             access_holder.reserve(outputs.size());
457             for (auto& output : outputs) {
458                 access_holder.emplace_back(prop_accessor.get_scoped_prop_access(*output,
459                     written_props.count(output) ? property_base_t::access_t::RWLOCKED
460                                                 : property_base_t::access_t::RW));
461             }
462 
463             // Run resolver
464             std::get<2>(resolver_tuple)();
465 
466             // Take note of outputs
467             written_props.insert(outputs.cbegin(), outputs.cend());
468 
469             // Add all outputs that are dirty to the list, unless they have
470             // already been processed
471             for (auto& output_prop : outputs) {
472                 if (output_prop->is_dirty() && processed_props.count(output_prop) == 0) {
473                     all_dirty_props.push_back(output_prop);
474                 }
475             }
476 
477             // RW or RWLOCKED gets released here as access_holder goes out of scope.
478         }
479         processed_props.insert(current_input_prop);
480     }
481 }
482 
resolve_all()483 void node_t::resolve_all()
484 {
485     _resolve_all_cb();
486 }
487 
488 
clean_props()489 void node_t::clean_props()
490 {
491     prop_accessor_t prop_accessor{};
492     for (const auto& type_prop_pair : _props) {
493         for (const auto& prop : type_prop_pair.second) {
494             if (prop->is_valid() && prop->is_dirty() && _clean_cb_registry.count(prop)) {
495                 _clean_cb_registry.at(prop)();
496             }
497             prop_accessor.mark_clean(*prop);
498             prop_accessor.set_access(prop, property_base_t::RO);
499         }
500     }
501 }
502 
503 
forward_edge_property(property_base_t * incoming_prop,const size_t incoming_port)504 void node_t::forward_edge_property(
505     property_base_t* incoming_prop, const size_t incoming_port)
506 {
507     UHD_ASSERT_THROW(
508         incoming_prop->get_src_info().type == res_source_info::INPUT_EDGE
509         || incoming_prop->get_src_info().type == res_source_info::OUTPUT_EDGE);
510     RFNOC_LOG_TRACE("Incoming edge property: `"
511                     << incoming_prop->get_id()
512                     << "`, source info: " << incoming_prop->get_src_info().to_string());
513 
514     // Don't forward properties that are not yet valid
515     if (!incoming_prop->is_valid()) {
516         RFNOC_LOG_TRACE("Skipped empty edge property: `"
517                         << incoming_prop->get_id() << "`, source info: "
518                         << incoming_prop->get_src_info().to_string());
519         return;
520     }
521 
522     // The source type of my local prop (it's the opposite of the source type
523     // of incoming_prop)
524     const auto prop_src_type =
525         res_source_info::invert_edge(incoming_prop->get_src_info().type);
526     // Set of local properties that match incoming_prop. It can be an empty set,
527     // or, if the node is misconfigured, a set with more than one entry. Or, if
528     // all is as expected, it's a set with a single entry.
529     auto local_prop_set = filter_props(
530         [prop_src_type, incoming_prop, incoming_port](property_base_t* prop) -> bool {
531             return prop->get_src_info().type == prop_src_type
532                    && prop->get_src_info().instance == incoming_port
533                    && prop->get_id() == incoming_prop->get_id();
534         });
535 
536     // If there is no such property, we're forwarding a new property
537     if (local_prop_set.empty()) {
538         RFNOC_LOG_TRACE(
539             "Received unknown incoming edge prop: " << incoming_prop->get_id());
540         local_prop_set.emplace(
541             inject_edge_property(incoming_prop, {prop_src_type, incoming_port}));
542     }
543     // There must be either zero results, or one
544     UHD_ASSERT_THROW(local_prop_set.size() == 1);
545 
546     auto local_prop = *local_prop_set.begin();
547 
548     prop_accessor_t prop_accessor{};
549     prop_accessor.forward<false>(incoming_prop, local_prop);
550 }
551 
receive_action(const res_source_info & src_info,action_info::sptr action)552 void node_t::receive_action(const res_source_info& src_info, action_info::sptr action)
553 {
554     std::lock_guard<std::mutex> l(_action_mutex);
555     // See if the user defined an action handler for us:
556     if (_action_handlers.count(action->key)) {
557         _action_handlers.at(action->key)(src_info, action);
558         return;
559     }
560 
561     // We won't forward actions if they were for us
562     if (src_info.type == res_source_info::USER) {
563         RFNOC_LOG_TRACE("Dropping USER action " << action->key << "#" << action->id);
564         return;
565     }
566 
567     // Otherwise, we need to figure out the correct default action handling:
568     const auto fwd_policy = [&](const std::string& id) {
569         if (_action_fwd_policies.count(id)) {
570             return _action_fwd_policies.at(id);
571         }
572         return _action_fwd_policies.at("");
573     }(action->key);
574 
575     // Now implement custom forwarding for all forwarding policies:
576     if (fwd_policy == forwarding_policy_t::DROP) {
577         RFNOC_LOG_TRACE("Dropping action " << action->key);
578     }
579     if (fwd_policy == forwarding_policy_t::ONE_TO_ONE) {
580         RFNOC_LOG_TRACE("Forwarding action " << action->key << " to opposite port");
581         const res_source_info dst_info{
582             res_source_info::invert_edge(src_info.type), src_info.instance};
583         if (_has_port(dst_info)) {
584             post_action(dst_info, action);
585         }
586     }
587     if (fwd_policy == forwarding_policy_t::ONE_TO_FAN) {
588         RFNOC_LOG_TRACE("Forwarding action " << action->key << " to all opposite ports");
589         const auto new_edge_type = res_source_info::invert_edge(src_info.type);
590         const size_t num_ports   = new_edge_type == res_source_info::INPUT_EDGE
591                                      ? get_num_input_ports()
592                                      : get_num_output_ports();
593         for (size_t i = 0; i < num_ports; i++) {
594             post_action({new_edge_type, i}, action);
595         }
596     }
597     if (fwd_policy == forwarding_policy_t::ONE_TO_ALL
598         || fwd_policy == forwarding_policy_t::ONE_TO_ALL_IN) {
599         RFNOC_LOG_TRACE("Forwarding action " << action->key << " to all input ports");
600         for (size_t i = 0; i < get_num_input_ports(); i++) {
601             if (src_info.type == res_source_info::INPUT_EDGE && i == src_info.instance) {
602                 continue;
603             }
604             post_action({res_source_info::INPUT_EDGE, i}, action);
605         }
606     }
607     if (fwd_policy == forwarding_policy_t::ONE_TO_ALL
608         || fwd_policy == forwarding_policy_t::ONE_TO_ALL_OUT) {
609         RFNOC_LOG_TRACE("Forwarding action " << action->key << " to all output ports");
610         for (size_t i = 0; i < get_num_output_ports(); i++) {
611             if (src_info.type == res_source_info::OUTPUT_EDGE && i == src_info.instance) {
612                 continue;
613             }
614             post_action({res_source_info::OUTPUT_EDGE, i}, action);
615         }
616     }
617     if (fwd_policy == forwarding_policy_t::USE_MAP) {
618         const auto& map_entry = _action_fwd_map.find(src_info);
619         if (map_entry != _action_fwd_map.end()) {
620             for (const auto& dst : map_entry->second) {
621                 if (!_has_port(dst)) {
622                     throw uhd::rfnoc_error("Destination port " + dst.to_string()
623                                            + " in action map does not exist");
624                 }
625                 RFNOC_LOG_TRACE(
626                     "Forwarding action " << action->key << " to " << dst.to_string());
627                 post_action(dst, action);
628             }
629         } else {
630             RFNOC_LOG_TRACE("Dropping action " << action->key << " on "
631                                                << src_info.to_string()
632                                                << " (no destinations in map)");
633         }
634     }
635 }
636 
shutdown()637 void node_t::shutdown()
638 {
639     RFNOC_LOG_DEBUG("shutdown() not implemented.");
640 }
641 
_has_port(const res_source_info & port_info) const642 bool node_t::_has_port(const res_source_info& port_info) const
643 {
644     return (port_info.type == res_source_info::INPUT_EDGE
645                && port_info.instance < get_num_input_ports())
646            || (port_info.type == res_source_info::OUTPUT_EDGE
647                   && port_info.instance < get_num_output_ports());
648 }
649