1 //
2 // Copyright 2019 Ettus Research, a National Instruments Brand
3 //
4 // SPDX-License-Identifier: GPL-3.0-or-later
5 //
6 
7 #include <uhd/exception.hpp>
8 #include <uhd/utils/log.hpp>
9 #include <uhdlib/rfnoc/graph.hpp>
10 #include <uhdlib/rfnoc/node_accessor.hpp>
11 #include <boost/graph/filtered_graph.hpp>
12 #include <boost/graph/topological_sort.hpp>
13 #include <limits>
14 #include <utility>
15 
16 using namespace uhd::rfnoc;
17 using namespace uhd::rfnoc::detail;
18 
19 namespace {
20 
21 const std::string LOG_ID                 = "RFNOC::GRAPH::DETAIL";
22 constexpr unsigned MAX_ACTION_ITERATIONS = 200;
23 
24 /*! Helper function to pretty-print edge info
25  */
print_edge(graph_t::node_ref_t src,graph_t::node_ref_t dst,graph_t::graph_edge_t edge_info)26 std::string print_edge(
27     graph_t::node_ref_t src, graph_t::node_ref_t dst, graph_t::graph_edge_t edge_info)
28 {
29     return src->get_unique_id() + ":" + std::to_string(edge_info.src_port) + " -> "
30            + dst->get_unique_id() + ":" + std::to_string(edge_info.dst_port);
31 }
32 
33 /*! Return a list of dirty properties from a node
34  */
get_dirty_props(graph_t::node_ref_t node_ref)35 auto get_dirty_props(graph_t::node_ref_t node_ref)
36 {
37     using namespace uhd::rfnoc;
38     node_accessor_t node_accessor{};
39     return node_accessor.filter_props(node_ref, [](property_base_t* prop) {
40         return prop->is_dirty()
41                && prop->get_src_info().type != res_source_info::FRAMEWORK;
42     });
43 }
44 
45 } // namespace
46 
47 /*! Graph-filtering predicate to find dirty nodes only
48  */
49 struct graph_t::DirtyNodePredicate
50 {
DirtyNodePredicategraph_t::DirtyNodePredicate51     DirtyNodePredicate() {} // Default ctor is required
DirtyNodePredicategraph_t::DirtyNodePredicate52     DirtyNodePredicate(graph_t::rfnoc_graph_t& graph) : _graph(&graph) {}
53 
54     template <typename Vertex>
operator ()graph_t::DirtyNodePredicate55     bool operator()(const Vertex& v) const
56     {
57         return !get_dirty_props(boost::get(graph_t::vertex_property_t(), *_graph, v))
58                     .empty();
59     }
60 
61 private:
62     // Don't make any attribute const, because default assignment operator
63     // is also required
64     graph_t::rfnoc_graph_t* _graph;
65 };
66 
67 /******************************************************************************
68  * Public API calls
69  *****************************************************************************/
connect(node_ref_t src_node,node_ref_t dst_node,graph_edge_t edge_info)70 void graph_t::connect(node_ref_t src_node, node_ref_t dst_node, graph_edge_t edge_info)
71 {
72     node_accessor_t node_accessor{};
73     UHD_LOG_TRACE(LOG_ID,
74         "Connecting block " << src_node->get_unique_id() << ":" << edge_info.src_port
75                             << " -> " << dst_node->get_unique_id() << ":"
76                             << edge_info.dst_port);
77 
78     // Correctly populate edge_info
79     edge_info.src_blockid = src_node->get_unique_id();
80     edge_info.dst_blockid = dst_node->get_unique_id();
81 
82     // Add nodes to graph, if not already in there:
83     _add_node(src_node);
84     _add_node(dst_node);
85     // Find vertex descriptors
86     auto src_vertex_desc = _node_map.at(src_node);
87     auto dst_vertex_desc = _node_map.at(dst_node);
88 
89     // Set resolver callbacks:
90     node_accessor.set_resolve_all_callback(src_node, [this, src_vertex_desc]() {
91         this->resolve_all_properties(resolve_context::NODE_PROP, src_vertex_desc);
92     });
93     node_accessor.set_resolve_all_callback(dst_node, [this, dst_vertex_desc]() {
94         this->resolve_all_properties(resolve_context::NODE_PROP, dst_vertex_desc);
95     });
96     // Set post action callbacks:
97     node_accessor.set_post_action_callback(
98         src_node, [this, src_node](const res_source_info& src, action_info::sptr action) {
99             this->enqueue_action(src_node, src, action);
100         });
101     node_accessor.set_post_action_callback(
102         dst_node, [this, dst_node](const res_source_info& src, action_info::sptr action) {
103             this->enqueue_action(dst_node, src, action);
104         });
105 
106     // Check if edge exists
107     auto out_edge_range = boost::out_edges(src_vertex_desc, _graph);
108     for (auto edge_it = out_edge_range.first; edge_it != out_edge_range.second;
109          ++edge_it) {
110         auto existing_edge_info = boost::get(edge_property_t(), _graph, *edge_it);
111 
112         // if exact edge exists, do nothing and return
113         if (existing_edge_info == edge_info) {
114             UHD_LOG_INFO(LOG_ID,
115                 "Ignoring repeated call to connect "
116                     << edge_info.src_blockid << ":" << edge_info.src_port << " -> "
117                     << edge_info.dst_blockid << ":" << edge_info.dst_port);
118             return;
119         }
120 
121         // if there is already an edge for the source block and port
122         if (existing_edge_info.src_port == edge_info.src_port
123             && existing_edge_info.src_blockid == edge_info.src_blockid) {
124             // if same destination block and port
125             if (existing_edge_info.dst_port == edge_info.dst_port
126                 && existing_edge_info.dst_blockid == edge_info.dst_blockid) {
127                 // attempt to modify edge properties - throw an error
128                 UHD_LOG_ERROR(LOG_ID,
129                     "Caught attempt to modify properties of edge "
130                         << existing_edge_info.src_blockid << ":"
131                         << existing_edge_info.src_port << " -> "
132                         << existing_edge_info.dst_blockid << ":"
133                         << existing_edge_info.dst_port);
134                 throw uhd::rfnoc_error("Caught attempt to modify properties of edge!");
135             } else {
136                 // Attempt to reconnect already connected source block and port
137                 UHD_LOG_ERROR(LOG_ID,
138                     "Attempting to reconnect output port "
139                         << existing_edge_info.src_blockid << ":"
140                         << existing_edge_info.src_port);
141                 throw uhd::rfnoc_error("Attempting to reconnect output port!");
142             }
143         }
144     }
145     auto in_edge_range = boost::in_edges(dst_vertex_desc, _graph);
146     for (auto edge_it = in_edge_range.first; edge_it != in_edge_range.second; ++edge_it) {
147         auto existing_edge_info = boost::get(edge_property_t(), _graph, *edge_it);
148         if (edge_info.dst_blockid == existing_edge_info.dst_blockid
149             && edge_info.dst_port == existing_edge_info.dst_port) {
150             UHD_LOG_ERROR(LOG_ID,
151                 "Attempting to reconnect input port " << existing_edge_info.dst_blockid
152                                                       << ":"
153                                                       << existing_edge_info.dst_port);
154             throw uhd::rfnoc_error("Attempting to reconnect input port!");
155         }
156     }
157 
158     // Create edge
159     auto edge_descriptor =
160         boost::add_edge(src_vertex_desc, dst_vertex_desc, edge_info, _graph);
161     UHD_ASSERT_THROW(edge_descriptor.second);
162 
163     // Now make sure we didn't add an unintended cycle
164     try {
165         _get_topo_sorted_nodes();
166     } catch (const uhd::rfnoc_error&) {
167         UHD_LOG_ERROR(LOG_ID,
168             "Adding edge " << src_node->get_unique_id() << ":" << edge_info.src_port
169                            << " -> " << dst_node->get_unique_id() << ":"
170                            << edge_info.dst_port
171                            << " without disabling property_propagation_active will lead "
172                               "to unresolvable graph!");
173         boost::remove_edge(edge_descriptor.first, _graph);
174         throw uhd::rfnoc_error(
175             "Adding edge without disabling property_propagation_active will lead "
176             "to unresolvable graph!");
177     }
178 }
179 
disconnect(node_ref_t src_node,node_ref_t dst_node,graph_edge_t edge_info)180 void graph_t::disconnect(node_ref_t src_node, node_ref_t dst_node, graph_edge_t edge_info)
181 {
182     // Find vertex descriptor
183     if (_node_map.count(src_node) == 0 && _node_map.count(dst_node) == 0) {
184         return;
185     }
186 
187     auto src_vertex_desc = _node_map.at(src_node);
188     auto dst_vertex_desc = _node_map.at(dst_node);
189 
190     edge_info.src_blockid = src_node->get_unique_id();
191     edge_info.dst_blockid = dst_node->get_unique_id();
192 
193     boost::remove_out_edge_if(src_vertex_desc,
194         [this, edge_info](rfnoc_graph_t::edge_descriptor edge_desc) {
195             return (edge_info == boost::get(edge_property_t(), this->_graph, edge_desc));
196         },
197         _graph);
198 
199     if (boost::degree(src_vertex_desc, _graph) == 0) {
200         _remove_node(src_node);
201     }
202 
203     if (boost::degree(dst_vertex_desc, _graph) == 0) {
204         _remove_node(dst_node);
205     }
206 }
207 
remove(node_ref_t node)208 void graph_t::remove(node_ref_t node)
209 {
210     _remove_node(node);
211 }
212 
commit()213 void graph_t::commit()
214 {
215     std::lock_guard<std::recursive_mutex> l(_release_mutex);
216     if (_release_count) {
217         _release_count--;
218     }
219     if (_release_count == 0) {
220         _check_topology();
221         resolve_all_properties(resolve_context::INIT, *boost::vertices(_graph).first);
222     }
223 }
224 
release()225 void graph_t::release()
226 {
227     std::lock_guard<std::recursive_mutex> l(_release_mutex);
228     UHD_LOG_TRACE(LOG_ID, "graph::release() => " << _release_count);
229     _release_count++;
230 }
231 
shutdown()232 void graph_t::shutdown()
233 {
234     std::lock_guard<std::recursive_mutex> l(_release_mutex);
235     UHD_LOG_TRACE(LOG_ID, "graph::shutdown()");
236     _shutdown      = true;
237     _release_count = std::numeric_limits<size_t>::max();
238 }
239 
enumerate_edges()240 std::vector<graph_t::graph_edge_t> graph_t::enumerate_edges()
241 {
242     auto e_iterators = boost::edges(_graph);
243     std::vector<graph_edge_t> result;
244     for (auto e_it = e_iterators.first; e_it != e_iterators.second; ++e_it) {
245         graph_edge_t edge_info = boost::get(edge_property_t(), _graph, *e_it);
246         // This is probably the dumbest way to make sure that the in- and out-
247         // edges don't both get stashed, but it works for now
248         if (std::find(result.begin(), result.end(), edge_info) == result.end()) {
249             result.push_back(boost::get(edge_property_t(), _graph, *e_it));
250         }
251     }
252     return result;
253 }
254 
255 /******************************************************************************
256  * Private methods to be called by friends
257  *****************************************************************************/
resolve_all_properties(resolve_context context,rfnoc_graph_t::vertex_descriptor initial_node)258 void graph_t::resolve_all_properties(
259     resolve_context context, rfnoc_graph_t::vertex_descriptor initial_node)
260 {
261     if (boost::num_vertices(_graph) == 0) {
262         return;
263     }
264 
265     node_accessor_t node_accessor{};
266     // We can't release during property propagation, so we lock this entire
267     // method to make sure that a) different threads can't interfere with each
268     // other, and b) that we don't release the graph while this method is still
269     // running.
270     std::lock_guard<std::recursive_mutex> l(_release_mutex);
271     if (_shutdown) {
272         return;
273     }
274     if (_release_count) {
275         node_ref_t current_node = boost::get(vertex_property_t(), _graph, initial_node);
276         UHD_LOG_TRACE(LOG_ID,
277             "Only resolving node " << current_node->get_unique_id()
278                                    << ", graph is not committed!");
279         // On current node, call local resolution.
280         node_accessor.resolve_props(current_node);
281         // Now mark all properties on this node as clean
282         node_accessor.clean_props(current_node);
283         return;
284     }
285 
286     // First, find the node on which we'll start.
287     auto initial_dirty_nodes = _find_dirty_nodes();
288     if (initial_dirty_nodes.size() > 1) {
289         UHD_LOGGER_WARNING(LOG_ID)
290             << "Found " << initial_dirty_nodes.size()
291             << " dirty nodes in initial search (expected one or zero). "
292                "Property propagation may resolve this.";
293         for (auto& vertex : initial_dirty_nodes) {
294             node_ref_t node = boost::get(vertex_property_t(), _graph, vertex);
295             UHD_LOG_WARNING(LOG_ID, "Dirty: " << node->get_unique_id());
296         }
297     }
298 
299     // Now get all nodes in topologically sorted order, and the appropriate
300     // iterators.
301     auto topo_sorted_nodes = _get_topo_sorted_nodes();
302     auto node_it           = topo_sorted_nodes.begin();
303     auto begin_it          = topo_sorted_nodes.begin();
304     auto end_it            = topo_sorted_nodes.end();
305     while (*node_it != initial_node) {
306         // We know *node_it must be == initial_node at some point, because
307         // otherwise, initial_dirty_nodes would have been empty
308         node_it++;
309     }
310 
311     // Start iterating over nodes
312     bool forward_dir   = true;
313     int num_iterations = 0;
314     // If all edge properties were known at the beginning, a single iteration
315     // would suffice. However, usually during the first time the property
316     // propagation is run, blocks create new (dynamic) edge properties that
317     // default to dirty. If we had a way of knowing when that happens, we could
318     // dynamically increase the number of iterations during the loop. For now,
319     // we simply hard-code the number of iterations to 2 so that we catch that
320     // case without any additional complications.
321     constexpr int MAX_NUM_ITERATIONS = 2;
322     while (true) {
323         node_ref_t current_node = boost::get(vertex_property_t(), _graph, *node_it);
324         UHD_LOG_TRACE(
325             LOG_ID, "Now resolving next node: " << current_node->get_unique_id());
326 
327         // On current node, call local resolution. This may cause other
328         // properties to become dirty.
329         try {
330             node_accessor.resolve_props(current_node);
331         } catch (const uhd::resolve_error& ex) {
332             UHD_LOG_ERROR(LOG_ID, current_node->get_unique_id() + ": " + ex.what());
333             throw;
334         }
335 
336         //  Forward all edge props in all directions from current node. We make
337         //  sure to skip properties if the edge is flagged as
338         //  !property_propagation_active
339         _forward_edge_props(*node_it);
340 
341         // Now mark all properties on this node as clean
342         node_accessor.clean_props(current_node);
343 
344         // If the property resolution was triggered by a node updating one of
345         // its properties, we can stop anytime there are no more dirty nodes.
346         if (context == resolve_context::NODE_PROP && _find_dirty_nodes().empty()) {
347             UHD_LOG_TRACE(LOG_ID,
348                 "Terminating graph resolution early during iteration " << num_iterations);
349             break;
350         }
351 
352         // The rest of the code in this loop is to figure out who's the next
353         // node. First, increment (or decrement) iterator:
354         if (forward_dir) {
355             node_it++;
356             // If we're at the end, flip the direction
357             if (node_it == end_it) {
358                 forward_dir = false;
359                 // Back off from the sentinel:
360                 node_it--;
361             }
362         }
363         if (!forward_dir) {
364             if (topo_sorted_nodes.size() > 1) {
365                 node_it--;
366                 // If we're back at the front, flip direction
367                 if (node_it == begin_it) {
368                     forward_dir = true;
369                 }
370             } else {
371                 forward_dir = true;
372             }
373         }
374         // If we're going forward, and the next node is the initial node,
375         // we've gone full circle (one full iteration).
376         if (forward_dir && (*node_it == initial_node)) {
377             num_iterations++;
378             if (num_iterations == MAX_NUM_ITERATIONS || _find_dirty_nodes().empty()) {
379                 UHD_LOG_TRACE(LOG_ID,
380                     "Terminating graph resolution after iteration " << num_iterations);
381                 break;
382             }
383         }
384     }
385 
386     // Post-iteration sanity checks:
387     // First, we make sure that there are no dirty properties left. If there are,
388     // that means our algorithm couldn't converge and we have a problem.
389     auto remaining_dirty_nodes = _find_dirty_nodes();
390     if (!remaining_dirty_nodes.empty()) {
391         UHD_LOG_ERROR(LOG_ID, "The following properties could not be resolved:");
392         for (auto& vertex : remaining_dirty_nodes) {
393             node_ref_t node           = boost::get(vertex_property_t(), _graph, vertex);
394             const std::string node_id = node->get_unique_id();
395             auto dirty_props          = get_dirty_props(node);
396             for (auto& prop : dirty_props) {
397                 UHD_LOG_ERROR(LOG_ID,
398                     "Dirty: " << node_id << "[" << prop->get_src_info().to_string() << " "
399                               << prop->get_id() << "]");
400             }
401         }
402         throw uhd::resolve_error("Could not resolve properties.");
403     }
404 
405     // Second, go through edges marked !property_propagation_active and make
406     // sure that they match up
407     BackEdgePredicate back_edge_filter(_graph);
408     auto e_iterators =
409         boost::edges(boost::filtered_graph<rfnoc_graph_t, BackEdgePredicate>(
410             _graph, back_edge_filter));
411     bool back_edges_valid = true;
412     for (auto e_it = e_iterators.first; e_it != e_iterators.second; ++e_it) {
413         back_edges_valid = back_edges_valid && _assert_edge_props_consistent(*e_it);
414     }
415     if (!back_edges_valid) {
416         throw uhd::resolve_error(
417             "Error during property resultion: Back-edges inconsistent!");
418     }
419 }
420 
enqueue_action(node_ref_t src_node,res_source_info src_edge,action_info::sptr action)421 void graph_t::enqueue_action(
422     node_ref_t src_node, res_source_info src_edge, action_info::sptr action)
423 {
424     // We can't release during action handling, so we lock this entire
425     // method to make sure that we don't release the graph while this method is
426     // still running.
427     // It also prevents a different thread from throwing in their own actions.
428     std::lock_guard<std::recursive_mutex> release_lock(_release_mutex);
429     if (_shutdown) {
430         return;
431     }
432     if (_release_count) {
433         UHD_LOG_WARNING(LOG_ID,
434             "Action propagation is not enabled, graph is not committed! Will not "
435             "propagate action `"
436                 << action->key << "'");
437         return;
438     }
439 
440     // Check if we're already in the middle of handling actions. In that case,
441     // we're already in the loop below, and then all we want to do is to enqueue
442     // this action tuple. The first call to enqueue_action() within this thread
443     // context will have handling_ongoing == false.
444     const bool handling_ongoing = _action_handling_ongoing.test_and_set();
445     // In any case, stash the new action at the end of the action queue
446     _action_queue.emplace_back(std::make_tuple(src_node, src_edge, action));
447     if (handling_ongoing) {
448         UHD_LOG_TRACE(LOG_ID,
449             "Action handling ongoing, deferring delivery of " << action->key << "#"
450                                                               << action->id);
451         return;
452     }
453 
454     unsigned iteration_count = 0;
455     while (!_action_queue.empty()) {
456         if (iteration_count++ == MAX_ACTION_ITERATIONS) {
457             throw uhd::runtime_error("Terminating action handling: Reached "
458                                      "recursion limit!");
459         }
460 
461         // Unpack next action
462         auto& next_action                  = _action_queue.front();
463         node_ref_t action_src_node         = std::get<0>(next_action);
464         res_source_info action_src_port    = std::get<1>(next_action);
465         action_info::sptr next_action_sptr = std::get<2>(next_action);
466         _action_queue.pop_front();
467 
468         // Find the node that is supposed to receive this action, and if we find
469         // something, then send the action. If the source port's type is USER,
470         // that means the action is meant for us.
471         node_ref_t recipient_node;
472         res_source_info recipient_port(action_src_port);
473 
474         if (action_src_port.type == res_source_info::USER) {
475             recipient_node = action_src_node;
476             recipient_port = action_src_port;
477         } else {
478             auto recipient_info =
479                 _find_neighbour(_node_map.at(action_src_node), action_src_port);
480             recipient_node = recipient_info.first;
481             if (recipient_node == nullptr) {
482                 UHD_LOG_WARNING(LOG_ID,
483                     "Cannot forward action "
484                         << action->key << " from " << src_node->get_unique_id() << ":"
485                         << src_edge.to_string() << ", no neighbour found!");
486                 continue;
487             }
488             recipient_port = {res_source_info::invert_edge(action_src_port.type),
489                 action_src_port.type == res_source_info::INPUT_EDGE
490                     ? recipient_info.second.src_port
491                     : recipient_info.second.dst_port};
492         }
493         // The following call can cause other nodes to add more actions to
494         // the end of _action_queue!
495         UHD_LOG_TRACE(LOG_ID,
496             "Now delivering action "
497                 << next_action_sptr->key << "#" << next_action_sptr->id << " to "
498                 << recipient_node->get_unique_id() << "@" << recipient_port.to_string());
499         node_accessor_t{}.send_action(recipient_node, recipient_port, next_action_sptr);
500     }
501     UHD_LOG_TRACE(LOG_ID, "Delivered all actions, terminating action handling.");
502 
503     // Release the action handling flag
504     _action_handling_ongoing.clear();
505     // Now, the _release_mutex is released, and someone else can start sending
506     // actions.
507 }
508 
509 /******************************************************************************
510  * Private methods
511  *****************************************************************************/
_find_dirty_nodes()512 graph_t::vertex_list_t graph_t::_find_dirty_nodes()
513 {
514     // Create a view on the graph that doesn't include the back-edges
515     DirtyNodePredicate vertex_filter(_graph);
516     boost::filtered_graph<rfnoc_graph_t, boost::keep_all, DirtyNodePredicate> fg(
517         _graph, boost::keep_all(), vertex_filter);
518 
519     auto v_iterators = boost::vertices(fg);
520     return vertex_list_t(v_iterators.first, v_iterators.second);
521 }
522 
_get_topo_sorted_nodes()523 graph_t::vertex_list_t graph_t::_get_topo_sorted_nodes()
524 {
525     // Create a view on the graph that doesn't include the back-edges
526     ForwardEdgePredicate edge_filter(_graph);
527     boost::filtered_graph<rfnoc_graph_t, ForwardEdgePredicate> fg(_graph, edge_filter);
528 
529     // Topo-sort and return
530     vertex_list_t sorted_nodes;
531     try {
532         boost::topological_sort(fg, std::front_inserter(sorted_nodes));
533     } catch (boost::not_a_dag&) {
534         throw uhd::rfnoc_error("Cannot resolve graph because it has at least one cycle!");
535     }
536     return sorted_nodes;
537 }
538 
_add_node(node_ref_t new_node)539 void graph_t::_add_node(node_ref_t new_node)
540 {
541     if (_node_map.count(new_node)) {
542         return;
543     }
544 
545     _node_map.emplace(new_node, boost::add_vertex(new_node, _graph));
546 }
547 
_remove_node(node_ref_t node)548 void graph_t::_remove_node(node_ref_t node)
549 {
550     if (_node_map.count(node)) {
551         auto vertex_desc = _node_map.at(node);
552 
553         // Remove all edges
554         boost::clear_vertex(vertex_desc, _graph);
555 
556         // Remove the vertex
557         boost::remove_vertex(vertex_desc, _graph);
558         _node_map.erase(node);
559 
560         // Removing the vertex changes the vertex descriptors,
561         // so update the node map
562         auto vertex_range = boost::vertices(_graph);
563         for (auto vertex_it = vertex_range.first; vertex_it != vertex_range.second;
564              vertex_it++) {
565             auto node       = boost::get(vertex_property_t(), _graph, *vertex_it);
566             _node_map[node] = *vertex_it;
567         }
568     }
569 }
570 
571 
_forward_edge_props(graph_t::rfnoc_graph_t::vertex_descriptor origin)572 void graph_t::_forward_edge_props(graph_t::rfnoc_graph_t::vertex_descriptor origin)
573 {
574     node_accessor_t node_accessor{};
575     node_ref_t origin_node = boost::get(vertex_property_t(), _graph, origin);
576 
577     auto edge_props = node_accessor.filter_props(origin_node, [](property_base_t* prop) {
578         return (prop->get_src_info().type == res_source_info::INPUT_EDGE
579                 || prop->get_src_info().type == res_source_info::OUTPUT_EDGE);
580     });
581     UHD_LOG_TRACE(LOG_ID,
582         "Forwarding up to " << edge_props.size() << " edge properties from node "
583                             << origin_node->get_unique_id());
584 
585     for (auto prop : edge_props) {
586         auto neighbour_node_info = _find_neighbour(origin, prop->get_src_info());
587         if (neighbour_node_info.first != nullptr
588             && neighbour_node_info.second.property_propagation_active) {
589             const size_t neighbour_port = prop->get_src_info().type
590                                                   == res_source_info::INPUT_EDGE
591                                               ? neighbour_node_info.second.src_port
592                                               : neighbour_node_info.second.dst_port;
593             node_accessor.forward_edge_property(
594                 neighbour_node_info.first, neighbour_port, prop);
595         }
596     }
597 }
598 
_assert_edge_props_consistent(rfnoc_graph_t::edge_descriptor edge)599 bool graph_t::_assert_edge_props_consistent(rfnoc_graph_t::edge_descriptor edge)
600 {
601     node_ref_t src_node =
602         boost::get(vertex_property_t(), _graph, boost::source(edge, _graph));
603     node_ref_t dst_node =
604         boost::get(vertex_property_t(), _graph, boost::target(edge, _graph));
605     graph_edge_t edge_info = boost::get(edge_property_t(), _graph, edge);
606 
607     // Helper function to get properties as maps
608     auto get_prop_map = [](const size_t port,
609                             res_source_info::source_t edge_type,
610                             node_ref_t node) {
611         node_accessor_t node_accessor{};
612         // Create a set of all properties
613         auto props_set = node_accessor.filter_props(
614             node, [port, edge_type, node](property_base_t* prop) {
615                 return prop->get_src_info().instance == port
616                        && prop->get_src_info().type == edge_type;
617             });
618         std::unordered_map<std::string, property_base_t*> prop_map;
619         for (auto prop_it = props_set.begin(); prop_it != props_set.end(); ++prop_it) {
620             prop_map.emplace((*prop_it)->get_id(), *prop_it);
621         }
622 
623         return prop_map;
624     };
625 
626     // Create two maps ID -> prop_ptr, so we have an easier time comparing them
627     auto src_prop_map =
628         get_prop_map(edge_info.src_port, res_source_info::OUTPUT_EDGE, src_node);
629     auto dst_prop_map =
630         get_prop_map(edge_info.dst_port, res_source_info::INPUT_EDGE, dst_node);
631 
632     // Now iterate through all properties, and make sure they match
633     bool props_match = true;
634     for (auto src_prop_it = src_prop_map.begin(); src_prop_it != src_prop_map.end();
635          ++src_prop_it) {
636         auto src_prop = src_prop_it->second;
637         auto dst_prop = dst_prop_map.at(src_prop->get_id());
638         if (!src_prop->equal(dst_prop)) {
639             UHD_LOG_ERROR(LOG_ID,
640                 "Edge property " << src_prop->get_id() << " inconsistent on edge "
641                                  << print_edge(src_node, dst_node, edge_info));
642             props_match = false;
643         }
644     }
645 
646     return props_match;
647 }
648 
_check_topology()649 void graph_t::_check_topology()
650 {
651     node_accessor_t node_accessor{};
652     bool topo_ok     = true;
653     auto v_iterators = boost::vertices(_graph);
654     for (auto it = v_iterators.first; it != v_iterators.second; ++it) {
655         node_ref_t node = boost::get(vertex_property_t(), _graph, *it);
656         std::vector<size_t> connected_inputs;
657         std::vector<size_t> connected_outputs;
658         auto ie_iters = boost::in_edges(*it, _graph);
659         for (auto it = ie_iters.first; it != ie_iters.second; ++it) {
660             graph_edge_t edge_info = boost::get(edge_property_t(), _graph, *it);
661             connected_inputs.push_back(edge_info.dst_port);
662         }
663         auto oe_iters = boost::out_edges(*it, _graph);
664         for (auto it = oe_iters.first; it != oe_iters.second; ++it) {
665             graph_edge_t edge_info = boost::get(edge_property_t(), _graph, *it);
666             connected_outputs.push_back(edge_info.src_port);
667         }
668 
669         if (!node_accessor.check_topology(node, connected_inputs, connected_outputs)) {
670             UHD_LOG_ERROR(LOG_ID,
671                 "Node " << node->get_unique_id()
672                         << "cannot handle its current topology! ("
673                         << connected_inputs.size() << "inputs, "
674                         << connected_outputs.size() << " outputs)");
675             topo_ok = false;
676         }
677     }
678 
679     if (!topo_ok) {
680         throw uhd::runtime_error("Graph topology is not valid!");
681     }
682 }
683 
_find_neighbour(rfnoc_graph_t::vertex_descriptor origin,res_source_info port_info)684 std::pair<graph_t::node_ref_t, graph_t::graph_edge_t> graph_t::_find_neighbour(
685     rfnoc_graph_t::vertex_descriptor origin, res_source_info port_info)
686 {
687     if (port_info.type == res_source_info::INPUT_EDGE) {
688         auto it_range = boost::in_edges(origin, _graph);
689         for (auto it = it_range.first; it != it_range.second; ++it) {
690             graph_edge_t edge_info = boost::get(edge_property_t(), _graph, *it);
691             if (edge_info.dst_port == port_info.instance) {
692                 return {
693                     boost::get(vertex_property_t(), _graph, boost::source(*it, _graph)),
694                     edge_info};
695             }
696         }
697         return {nullptr, {}};
698     }
699     if (port_info.type == res_source_info::OUTPUT_EDGE) {
700         auto it_range = boost::out_edges(origin, _graph);
701         for (auto it = it_range.first; it != it_range.second; ++it) {
702             graph_edge_t edge_info = boost::get(edge_property_t(), _graph, *it);
703             if (edge_info.src_port == port_info.instance) {
704                 return {
705                     boost::get(vertex_property_t(), _graph, boost::target(*it, _graph)),
706                     edge_info};
707             }
708         }
709         return {nullptr, {}};
710     }
711 
712     UHD_THROW_INVALID_CODE_PATH();
713 }
714