1 //
2 // Copyright 2019 Ettus Research, a National Instruments Brand
3 //
4 // SPDX-License-Identifier: GPL-3.0-or-later
5 //
6 
7 
8 #include <uhd/exception.hpp>
9 #include <uhd/utils/log.hpp>
10 #include <uhdlib/rfnoc/chdr_ctrl_xport.hpp>
11 #include <uhdlib/rfnoc/chdr_packet_writer.hpp>
12 #include <uhdlib/rfnoc/mgmt_portal.hpp>
13 #include <unordered_set>
14 #include <boost/format.hpp>
15 #include <cmath>
16 #include <mutex>
17 #include <queue>
18 
19 namespace uhd { namespace rfnoc { namespace mgmt {
20 
21 using namespace chdr;
22 using namespace transport;
23 
24 constexpr bool ALLOW_DAISY_CHAINING = true;
25 
26 constexpr uint16_t REG_EPID_SELF               = 0x00; // RW
27 constexpr uint16_t REG_RESET_AND_FLUSH         = 0x04; // W
28 constexpr uint16_t REG_OSTRM_CTRL_STATUS       = 0x08; // RW
29 constexpr uint16_t REG_OSTRM_DST_EPID          = 0x0C; // W
30 constexpr uint16_t REG_OSTRM_FC_FREQ_BYTES_LO  = 0x10; // W
31 constexpr uint16_t REG_OSTRM_FC_FREQ_BYTES_HI  = 0x14; // W
32 constexpr uint16_t REG_OSTRM_FC_FREQ_PKTS      = 0x18; // W
33 constexpr uint16_t REG_OSTRM_FC_HEADROOM       = 0x1C; // W
34 constexpr uint16_t REG_OSTRM_BUFF_CAP_BYTES_LO = 0x20; // R
35 constexpr uint16_t REG_OSTRM_BUFF_CAP_BYTES_HI = 0x24; // R
36 constexpr uint16_t REG_OSTRM_BUFF_CAP_PKTS     = 0x28; // R
37 constexpr uint16_t REG_OSTRM_SEQ_ERR_CNT       = 0x2C; // R
38 constexpr uint16_t REG_OSTRM_DATA_ERR_CNT      = 0x30; // R
39 constexpr uint16_t REG_OSTRM_ROUTE_ERR_CNT     = 0x34; // R
40 constexpr uint16_t REG_ISTRM_CTRL_STATUS       = 0x38; // RW
41 
42 constexpr uint32_t RESET_AND_FLUSH_OSTRM = (1 << 0);
43 constexpr uint32_t RESET_AND_FLUSH_ISTRM = (1 << 1);
44 constexpr uint32_t RESET_AND_FLUSH_CTRL  = (1 << 2);
45 constexpr uint32_t RESET_AND_FLUSH_ALL   = 0x7;
46 
47 #ifdef UHD_BIG_ENDIAN
48 constexpr endianness_t HOST_ENDIANNESS = ENDIANNESS_BIG;
49 #else
50 constexpr endianness_t HOST_ENDIANNESS = ENDIANNESS_LITTLE;
51 #endif
52 
BUILD_CTRL_STATUS_WORD(bool cfg_start,bool xport_lossy,sw_buff_t pyld_buff_fmt,sw_buff_t mdata_buff_fmt,bool byte_swap)53 constexpr uint32_t BUILD_CTRL_STATUS_WORD(bool cfg_start,
54     bool xport_lossy,
55     sw_buff_t pyld_buff_fmt,
56     sw_buff_t mdata_buff_fmt,
57     bool byte_swap)
58 {
59     return (cfg_start ? 1 : 0) | (xport_lossy ? 2 : 0)
60            | (static_cast<uint32_t>(pyld_buff_fmt) << 2)
61            | (static_cast<uint32_t>(mdata_buff_fmt) << 4) | (byte_swap ? (1 << 6) : 0);
62 }
63 
64 constexpr uint32_t STRM_STATUS_FC_ENABLED    = 0x80000000;
65 constexpr uint32_t STRM_STATUS_SETUP_ERR     = 0x40000000;
66 constexpr uint32_t STRM_STATUS_SETUP_PENDING = 0x20000000;
67 
68 
69 //! The type of a node in the data-flow graph
70 enum node_type {
71     //! Invalid type. The FPGA will never have a node with type = 0
72     NODE_TYPE_INVALID = 0,
73     //! CHDR Crossbar
74     NODE_TYPE_XBAR = 1,
75     //! Stream Endpoint
76     NODE_TYPE_STRM_EP = 2,
77     //! Transport
78     NODE_TYPE_XPORT = 3
79 };
80 
81 //! A unique identifier for a node
82 struct node_id_t
83 {
84     //! A unique ID for device that houses this node
85     device_id_t device_id = NULL_DEVICE_ID;
86     //! The type of this node
87     node_type type = NODE_TYPE_INVALID;
88     //! The instance number of this node in the device
89     sep_inst_t inst = 0;
90     //! Extended info about node (not used for comparisons)
91     uint32_t extended_info = 0;
92 
93     // ctors and operators
94     node_id_t()                     = default;
95     node_id_t(const node_id_t& rhs) = default;
node_id_tuhd::rfnoc::mgmt::node_id_t96     node_id_t(device_id_t device_id_, node_type type_, sep_inst_t inst_)
97         : device_id(device_id_), type(type_), inst(inst_), extended_info(0)
98     {
99     }
node_id_tuhd::rfnoc::mgmt::node_id_t100     node_id_t(device_id_t device_id_,
101         node_type type_,
102         sep_inst_t inst_,
103         uint32_t extended_info_)
104         : device_id(device_id_), type(type_), inst(inst_), extended_info(extended_info_)
105     {
106     }
node_id_tuhd::rfnoc::mgmt::node_id_t107     node_id_t(const sep_addr_t& sep_addr)
108         : device_id(sep_addr.first)
109         , type(NODE_TYPE_STRM_EP)
110         , inst(sep_addr.second)
111         , extended_info(0)
112     {
113     }
114 
unique_iduhd::rfnoc::mgmt::node_id_t115     inline uint64_t unique_id() const
116     {
117         return (static_cast<uint64_t>(inst) + (static_cast<uint64_t>(device_id) << 16)
118                 + (static_cast<uint64_t>(type) << 32));
119     }
to_stringuhd::rfnoc::mgmt::node_id_t120     inline std::string to_string() const
121     {
122         static const std::map<node_type, std::string> NODE_STR = {
123             {NODE_TYPE_INVALID, "unknown"},
124             {NODE_TYPE_XBAR, "xbar"},
125             {NODE_TYPE_STRM_EP, "sep"},
126             {NODE_TYPE_XPORT, "xport"}};
127         return str(
128             boost::format("device:%d/%s:%d") % device_id % NODE_STR.at(type) % inst);
129     }
130 
operator <(const node_id_t & lhs,const node_id_t & rhs)131     inline friend bool operator<(const node_id_t& lhs, const node_id_t& rhs)
132     {
133         return (lhs.unique_id() < rhs.unique_id());
134     }
operator ==(const node_id_t & lhs,const node_id_t & rhs)135     inline friend bool operator==(const node_id_t& lhs, const node_id_t& rhs)
136     {
137         return (lhs.unique_id() == rhs.unique_id());
138     }
operator !=(const node_id_t & lhs,const node_id_t & rhs)139     inline friend bool operator!=(const node_id_t& lhs, const node_id_t& rhs)
140     {
141         return (lhs.unique_id() != rhs.unique_id());
142     }
143     inline node_id_t& operator=(const node_id_t&) = default;
144 };
145 
146 //! The local destination to take at the current node to reach the next node
147 //  - If negative, then no specific action necessary
148 //  - If non-negative, then route (select destination) to the value
149 using next_dest_t = int32_t;
150 
151 //! An address that allows locating a node in a data-flow network starting from
152 //  a specific stream endpoint. The address is a collection (vector) of nodes and
153 //  the respective routing decisions to get to the final node.
154 using node_addr_t = std::vector<std::pair<node_id_t, next_dest_t>>;
155 
to_string(const node_addr_t & node_addr)156 std::string to_string(const node_addr_t& node_addr)
157 {
158     if (!node_addr.empty()) {
159         std::string str("");
160         for (const auto& hop : node_addr) {
161             str += hop.first.to_string() + std::string(",") + std::to_string(hop.second)
162                    + std::string("->");
163         }
164         return str;
165     } else {
166         return std::string("<empty>");
167     }
168 }
169 
170 // Empty dtor for stream_manager
~mgmt_portal()171 mgmt_portal::~mgmt_portal() {}
172 
173 //---------------------------------------------------------------
174 // Stream Manager Implementation
175 //---------------------------------------------------------------
176 class mgmt_portal_impl : public mgmt_portal
177 {
178 public:
mgmt_portal_impl(chdr_ctrl_xport & xport,const chdr::chdr_packet_factory & pkt_factory,sep_addr_t my_sep_addr)179     mgmt_portal_impl(chdr_ctrl_xport& xport,
180         const chdr::chdr_packet_factory& pkt_factory,
181         sep_addr_t my_sep_addr)
182         : _protover(pkt_factory.get_protover())
183         , _chdr_w(pkt_factory.get_chdr_w())
184         , _endianness(pkt_factory.get_endianness())
185         , _my_node_id(my_sep_addr.first, NODE_TYPE_STRM_EP, xport.get_epid())
186         , _send_seqnum(0)
187         , _send_pkt(std::move(pkt_factory.make_mgmt()))
188         , _recv_pkt(std::move(pkt_factory.make_mgmt()))
189     {
190         std::lock_guard<std::recursive_mutex> lock(_mutex);
191         _discover_topology(xport);
192         UHD_LOG_DEBUG("RFNOC::MGMT",
193             "The following endpoints are reachable from " << _my_node_id.to_string());
194         for (const auto& ep : _discovered_ep_set) {
195             UHD_LOG_DEBUG("RFNOC::MGMT", "* " << ep.first << ":" << ep.second);
196         }
197     }
198 
~mgmt_portal_impl()199     virtual ~mgmt_portal_impl() {}
200 
get_reachable_endpoints() const201     virtual const std::set<sep_addr_t>& get_reachable_endpoints() const
202     {
203         return _discovered_ep_set;
204     }
205 
initialize_endpoint(chdr_ctrl_xport & xport,const sep_addr_t & addr,const sep_id_t & epid)206     virtual void initialize_endpoint(
207         chdr_ctrl_xport& xport, const sep_addr_t& addr, const sep_id_t& epid)
208     {
209         std::lock_guard<std::recursive_mutex> lock(_mutex);
210 
211         auto my_epid = xport.get_epid();
212 
213         // Create a node ID from lookup info
214         node_id_t lookup_node(addr.first, NODE_TYPE_STRM_EP, addr.second);
215         if (_node_addr_map.count(lookup_node) == 0) {
216             throw uhd::lookup_error(
217                 "initialize_endpoint(): Cannot reach node with specified address.");
218         }
219         const node_addr_t& node_addr = _node_addr_map.at(lookup_node);
220 
221         // Build a management transaction to first get to the node
222         mgmt_payload cfg_xact;
223         cfg_xact.set_header(my_epid, _protover, _chdr_w);
224         _traverse_to_node(cfg_xact, node_addr);
225 
226         mgmt_hop_t cfg_hop;
227         cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ,
228             mgmt_op_t::cfg_payload(REG_RESET_AND_FLUSH, RESET_AND_FLUSH_ALL)));
229         cfg_hop.add_op(mgmt_op_t(
230             mgmt_op_t::MGMT_OP_CFG_WR_REQ, mgmt_op_t::cfg_payload(REG_EPID_SELF, epid)));
231         cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN));
232         cfg_xact.add_hop(cfg_hop);
233 
234         // Send the transaction and receive a response.
235         // We don't care about the contents of the response.
236         _send_recv_mgmt_transaction(xport, cfg_xact);
237         register_endpoint(addr, epid);
238     }
239 
register_endpoint(const sep_addr_t & addr,const sep_id_t & epid)240     virtual void register_endpoint(const sep_addr_t& addr, const sep_id_t& epid)
241     {
242         std::lock_guard<std::recursive_mutex> lock(_mutex);
243         if (is_endpoint_registered(epid)) {
244             return;
245         }
246         // Create a node ID from lookup info
247         node_id_t lookup_node(addr.first, NODE_TYPE_STRM_EP, addr.second);
248         if (_node_addr_map.count(lookup_node) == 0) {
249             throw uhd::lookup_error(
250                 "initialize_endpoint(): Cannot reach node with specified address.");
251         }
252         // Add/update the entry in the stream endpoint ID map
253         _epid_addr_map[epid] = addr;
254         UHD_LOG_DEBUG("RFNOC::MGMT",
255             (boost::format("Bound stream endpoint with Addr=(%d,%d) to EPID=%d")
256                 % addr.first % addr.second % epid));
257         UHD_LOG_TRACE("RFNOC::MGMT",
258             (boost::format(
259                  "Stream endpoint with EPID=%d can be reached by taking the path: %s")
260                 % epid % to_string(_node_addr_map.at(lookup_node))));
261     }
262 
is_endpoint_registered(const sep_id_t & epid) const263     virtual bool is_endpoint_registered(const sep_id_t& epid) const
264     {
265         std::lock_guard<std::recursive_mutex> lock(_mutex);
266         return (_epid_addr_map.count(epid) > 0);
267     }
268 
get_endpoint_info(const sep_id_t & epid) const269     virtual sep_info_t get_endpoint_info(const sep_id_t& epid) const
270     {
271         std::lock_guard<std::recursive_mutex> lock(_mutex);
272 
273         // Lookup the destination node address using the endpoint ID
274         if (_epid_addr_map.count(epid) == 0) {
275             throw uhd::lookup_error(
276                 "get_endpoint_info(): Could not find a stream with specified ID.");
277         }
278         node_id_t lookup_node(_epid_addr_map.at(epid));
279         // If a node is in _epid_addr_map then it must be in _node_addr_map
280         UHD_ASSERT_THROW(_node_addr_map.count(lookup_node) > 0);
281         // Why is key_node different from lookup_node?
282         // Because it has additional extended info (look at operator< def)
283         const node_id_t& key_node = _node_addr_map.find(lookup_node)->first;
284 
285         // Build a return val
286         sep_info_t retval;
287         retval.has_ctrl        = (key_node.extended_info >> 0) & 0x1;
288         retval.has_data        = (key_node.extended_info >> 1) & 0x1;
289         retval.num_input_ports = retval.has_data ? ((key_node.extended_info >> 2) & 0x3F)
290                                                  : 0;
291         retval.num_output_ports = retval.has_data ? ((key_node.extended_info >> 8) & 0x3F)
292                                                   : 0;
293         retval.reports_strm_errs = (key_node.extended_info >> 14) & 0x1;
294         retval.addr              = _epid_addr_map.at(epid);
295         return retval;
296     }
297 
setup_local_route(chdr_ctrl_xport & xport,const sep_id_t & dst_epid)298     virtual void setup_local_route(chdr_ctrl_xport& xport, const sep_id_t& dst_epid)
299     {
300         std::lock_guard<std::recursive_mutex> lock(_mutex);
301         auto my_epid = xport.get_epid();
302 
303         // Lookup the physical stream endpoint address using the endpoint ID
304         const node_addr_t& node_addr = _lookup_sep_node_addr(dst_epid);
305 
306         node_addr_t route_addr = node_addr_t();
307         route_addr.push_back(std::make_pair(_my_node_id, next_dest_t(-1)));
308         for (const auto& addr_pair : node_addr) {
309             mgmt_payload init_req_xact;
310             _traverse_to_node(init_req_xact, route_addr);
311             _push_node_init_hop(init_req_xact, addr_pair.first, my_epid);
312             const mgmt_payload resp_xact =
313                 _send_recv_mgmt_transaction(xport, init_req_xact);
314             route_addr.push_back(addr_pair);
315         }
316 
317         // Build a management transaction to configure all the nodes in the path going to
318         // dst_epid
319         mgmt_payload cfg_xact;
320         cfg_xact.set_header(my_epid, _protover, _chdr_w);
321 
322         for (const auto& addr_pair : node_addr) {
323             const node_id_t& curr_node   = addr_pair.first;
324             const next_dest_t& curr_dest = addr_pair.second;
325             mgmt_hop_t curr_cfg_hop;
326             switch (curr_node.type) {
327                 case NODE_TYPE_XBAR: {
328                     // Configure the routing table to route all packets going to dst_epid
329                     // to the port with index next_dest_t
330                     curr_cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ,
331                         mgmt_op_t::cfg_payload(dst_epid, curr_dest)));
332                     curr_cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_SEL_DEST,
333                         mgmt_op_t::sel_dest_payload(static_cast<uint16_t>(curr_dest))));
334                 } break;
335                 case NODE_TYPE_XPORT: {
336                     uint8_t node_subtype =
337                         static_cast<uint8_t>(curr_node.extended_info & 0xFF);
338                     // Run a hop configuration function for custom transports
339                     if (_rtcfg_cfg_fns.count(node_subtype)) {
340                         _rtcfg_cfg_fns.at(node_subtype)(curr_node.device_id,
341                             curr_node.inst,
342                             node_subtype,
343                             curr_cfg_hop);
344                     } else {
345                         curr_cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_NOP));
346                     }
347                 } break;
348                 case NODE_TYPE_STRM_EP: {
349                     // Stream are not involved in routing, so do nothing
350                 } break;
351                 default: {
352                     UHD_THROW_INVALID_CODE_PATH();
353                 } break;
354             }
355             // Add this hop to the trancation only if it's not empty
356             if (curr_cfg_hop.get_num_ops() > 0) {
357                 cfg_xact.add_hop(curr_cfg_hop);
358             }
359         }
360 
361         // If we follow this route then we should end up at a stream endpoint
362         // so add an extra hop and return the packet back with the node info we will
363         // sanity check it later
364         mgmt_hop_t discover_hop;
365         discover_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_INFO_REQ));
366         discover_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN));
367         cfg_xact.add_hop(discover_hop);
368 
369         // Send the transaction and validate that we saw a stream endpoint
370         const mgmt_payload sep_info_xact = _send_recv_mgmt_transaction(xport, cfg_xact);
371         const node_id_t sep_node         = _pop_node_discovery_hop(sep_info_xact);
372         if (sep_node.type != NODE_TYPE_STRM_EP) {
373             throw uhd::routing_error(
374                 "Route setup failed. Could not confirm terminal stream endpoint");
375         }
376 
377         UHD_LOG_DEBUG("RFNOC::MGMT",
378             (boost::format("Established a route from EPID=%d (SW) to EPID=%d")
379                 % xport.get_epid() % dst_epid));
380         UHD_LOG_TRACE("RFNOC::MGMT",
381             (boost::format("The destination for EPID=%d has been added to all routers in "
382                            "the path: %s")
383                 % dst_epid % to_string(node_addr)));
384     }
385 
can_remote_route(const sep_addr_t & dst_addr,const sep_addr_t & src_addr) const386     virtual bool can_remote_route(
387         const sep_addr_t& dst_addr, const sep_addr_t& src_addr) const
388     {
389         std::lock_guard<std::recursive_mutex> lock(_mutex);
390 
391         if ((_discovered_ep_set.count(dst_addr) == 0)
392             || (_discovered_ep_set.count(src_addr) == 0)) {
393             // Can't route to/from something if we don't know about it
394             return false;
395         }
396 
397         UHD_ASSERT_THROW(_node_addr_map.count(node_id_t(dst_addr)) > 0);
398         UHD_ASSERT_THROW(_node_addr_map.count(node_id_t(src_addr)) > 0);
399 
400         // Lookup the src and dst node address using the endpoint ID
401         const node_addr_t& dst_node_addr = _node_addr_map.at(node_id_t(dst_addr));
402         const node_addr_t& src_node_addr = _node_addr_map.at(node_id_t(src_addr));
403 
404         // Find a common parent (could be faster than n^2 but meh, this is easier)
405         for (const auto& dnode : dst_node_addr) {
406             for (const auto& snode : src_node_addr) {
407                 if (dnode.first == snode.first && dnode.first.type == NODE_TYPE_XBAR) {
408                     return true;
409                 }
410             }
411         }
412         return false;
413     }
414 
setup_remote_route(chdr_ctrl_xport & xport,const sep_id_t & dst_epid,const sep_id_t & src_epid)415     virtual void setup_remote_route(
416         chdr_ctrl_xport& xport, const sep_id_t& dst_epid, const sep_id_t& src_epid)
417     {
418         std::lock_guard<std::recursive_mutex> lock(_mutex);
419 
420         if (not is_endpoint_registered(dst_epid)) {
421             throw uhd::routing_error("Route setup failed. The destination endpoint was "
422                                      "not bound to an EPID and registered");
423         }
424         if (not is_endpoint_registered(src_epid)) {
425             throw uhd::routing_error("Route setup failed. The source endpoint was "
426                                      "not bound to an EPID and registered");
427         }
428 
429         if (not can_remote_route(
430                 _epid_addr_map.at(dst_epid), _epid_addr_map.at(src_epid))) {
431             throw uhd::routing_error("Route setup failed. The endpoints don't share a "
432                                      "common crossbar parent.");
433         }
434 
435         // If we setup local routes from this host to both the source and destination
436         // endpoints then the routing algorithm will guarantee that packet between src and
437         // dst will have a path between them as long as they share a common parent
438         // (crossbar). The assumption is verified above. It is also guaranteed that the
439         // path between them will be the shortest one. It is possible that we are
440         // configuring more crossbars than necessary but we do this for simplicity. If
441         // there is a need to optimize for routing table fullness, we can do a software
442         // graph traversal here, find the closest common parent (crossbar) for the two
443         // nodes and only configure the nodes downstream of that.
444         setup_local_route(xport, dst_epid);
445         setup_local_route(xport, src_epid);
446 
447         UHD_LOG_DEBUG("RFNOC::MGMT",
448             (boost::format(
449                  "The two routes above now enable a route from EPID=%d to EPID=%s")
450                 % src_epid % dst_epid));
451     }
452 
config_local_rx_stream_start(chdr_ctrl_xport & xport,const sep_id_t & epid,const bool lossy_xport,const sw_buff_t pyld_buff_fmt,const sw_buff_t mdata_buff_fmt,const stream_buff_params_t & fc_freq,const stream_buff_params_t & fc_headroom,const bool reset=false)453     virtual void config_local_rx_stream_start(chdr_ctrl_xport& xport,
454         const sep_id_t& epid,
455         const bool lossy_xport,
456         const sw_buff_t pyld_buff_fmt,
457         const sw_buff_t mdata_buff_fmt,
458         const stream_buff_params_t& fc_freq,
459         const stream_buff_params_t& fc_headroom,
460         const bool reset = false)
461     {
462         std::lock_guard<std::recursive_mutex> lock(_mutex);
463         auto my_epid = xport.get_epid();
464 
465         // The discovery process has already setup a route from the
466         // destination to us. No additional action is necessary.
467 
468         const node_addr_t& node_addr = _lookup_sep_node_addr(epid);
469 
470         // Build a management transaction to first get to the node
471         mgmt_payload cfg_xact;
472         cfg_xact.set_header(my_epid, _protover, _chdr_w);
473         _traverse_to_node(cfg_xact, node_addr);
474 
475         mgmt_hop_t cfg_hop;
476         // Assert reset if requested
477         if (reset) {
478             cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ,
479                 mgmt_op_t::cfg_payload(REG_RESET_AND_FLUSH, RESET_AND_FLUSH_OSTRM)));
480         }
481         // Set destination of the stream to us (this endpoint)
482         cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ,
483             mgmt_op_t::cfg_payload(REG_OSTRM_DST_EPID, my_epid)));
484         // Configure flow control parameters
485         _push_ostrm_flow_control_config(lossy_xport,
486             pyld_buff_fmt,
487             mdata_buff_fmt,
488             _endianness != HOST_ENDIANNESS,
489             fc_freq,
490             fc_headroom,
491             cfg_hop);
492         // Return the packet back to us
493         cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN));
494 
495         // Send the transaction and receive a response.
496         // We don't care about the contents of the response.
497         cfg_xact.add_hop(cfg_hop);
498         _send_recv_mgmt_transaction(xport, cfg_xact);
499 
500         UHD_LOG_DEBUG("RFNOC::MGMT",
501             (boost::format("Initiated RX stream setup for EPID=%d") % epid));
502     }
503 
config_local_rx_stream_commit(chdr_ctrl_xport & xport,const sep_id_t & epid,const double timeout=0.2)504     virtual stream_buff_params_t config_local_rx_stream_commit(
505         chdr_ctrl_xport& xport, const sep_id_t& epid, const double timeout = 0.2)
506     {
507         std::lock_guard<std::recursive_mutex> lock(_mutex);
508 
509         // Wait for stream configuration to finish on the HW side
510         const node_addr_t& node_addr = _lookup_sep_node_addr(epid);
511         _validate_stream_setup(xport, node_addr, timeout);
512 
513         UHD_LOG_DEBUG("RFNOC::MGMT",
514             (boost::format("Finished RX stream setup for EPID=%d") % epid));
515 
516         // Return discovered buffer parameters
517         return std::get<1>(_get_ostrm_status(xport, node_addr));
518     }
519 
config_local_tx_stream(chdr_ctrl_xport & xport,const sep_id_t & epid,const sw_buff_t pyld_buff_fmt,const sw_buff_t mdata_buff_fmt,const bool reset=false)520     virtual void config_local_tx_stream(chdr_ctrl_xport& xport,
521         const sep_id_t& epid,
522         const sw_buff_t pyld_buff_fmt,
523         const sw_buff_t mdata_buff_fmt,
524         const bool reset = false)
525     {
526         std::lock_guard<std::recursive_mutex> lock(_mutex);
527         auto my_epid = xport.get_epid();
528 
529         // First setup a route between to the endpoint
530         setup_local_route(xport, epid);
531 
532         const node_addr_t& node_addr = _lookup_sep_node_addr(epid);
533 
534         // Build a management transaction to first get to the node
535         mgmt_payload cfg_xact;
536         cfg_xact.set_header(my_epid, _protover, _chdr_w);
537         _traverse_to_node(cfg_xact, node_addr);
538 
539         mgmt_hop_t cfg_hop;
540         // Assert reset if requested
541         if (reset) {
542             cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ,
543                 mgmt_op_t::cfg_payload(REG_RESET_AND_FLUSH, RESET_AND_FLUSH_ISTRM)));
544         }
545         // Configure buffer types
546         cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ,
547             mgmt_op_t::cfg_payload(REG_ISTRM_CTRL_STATUS,
548                 BUILD_CTRL_STATUS_WORD(false,
549                     false,
550                     pyld_buff_fmt,
551                     mdata_buff_fmt,
552                     _endianness != HOST_ENDIANNESS))));
553         cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN));
554         cfg_xact.add_hop(cfg_hop);
555 
556         // Send the transaction and receive a response.
557         // We don't care about the contents of the response.
558         _send_recv_mgmt_transaction(xport, cfg_xact);
559 
560         UHD_LOG_DEBUG("RFNOC::MGMT",
561             (boost::format("Finished TX stream setup for EPID=%d") % epid));
562     }
563 
config_remote_stream(chdr_ctrl_xport & xport,const sep_id_t & dst_epid,const sep_id_t & src_epid,const bool lossy_xport,const stream_buff_params_t & fc_freq,const stream_buff_params_t & fc_headroom,const bool reset=false,const double timeout=0.2)564     virtual stream_buff_params_t config_remote_stream(chdr_ctrl_xport& xport,
565         const sep_id_t& dst_epid,
566         const sep_id_t& src_epid,
567         const bool lossy_xport,
568         const stream_buff_params_t& fc_freq,
569         const stream_buff_params_t& fc_headroom,
570         const bool reset     = false,
571         const double timeout = 0.2)
572     {
573         std::lock_guard<std::recursive_mutex> lock(_mutex);
574         auto my_epid = xport.get_epid();
575 
576         // First setup a route between the two endpoints
577         setup_remote_route(xport, dst_epid, src_epid);
578 
579         const node_addr_t& dst_node_addr = _lookup_sep_node_addr(dst_epid);
580         const node_addr_t& src_node_addr = _lookup_sep_node_addr(src_epid);
581 
582         // If requested, send transactions to reset and flush endpoints
583         if (reset) {
584             // Reset source and destination (in that order)
585             for (size_t i = 0; i < 2; i++) {
586                 mgmt_payload rst_xact;
587                 rst_xact.set_header(my_epid, _protover, _chdr_w);
588                 _traverse_to_node(rst_xact, (i == 0) ? src_node_addr : dst_node_addr);
589                 mgmt_hop_t rst_hop;
590                 rst_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ,
591                     mgmt_op_t::cfg_payload(REG_RESET_AND_FLUSH,
592                         (i == 0) ? RESET_AND_FLUSH_OSTRM : RESET_AND_FLUSH_ISTRM)));
593                 rst_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN));
594                 rst_xact.add_hop(rst_hop);
595                 _send_recv_mgmt_transaction(xport, rst_xact);
596             }
597         }
598 
599         // Build a management transaction to configure the source node
600         {
601             mgmt_payload cfg_xact;
602             cfg_xact.set_header(my_epid, _protover, _chdr_w);
603             _traverse_to_node(cfg_xact, src_node_addr);
604             mgmt_hop_t cfg_hop;
605             // Set destination of the stream to dst_epid
606             cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ,
607                 mgmt_op_t::cfg_payload(REG_OSTRM_DST_EPID, dst_epid)));
608             // Configure flow control parameters
609             _push_ostrm_flow_control_config(
610                 lossy_xport, BUFF_U64, BUFF_U64, false, fc_freq, fc_headroom, cfg_hop);
611             // Return the packet back to us
612             cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN));
613 
614             // Send the transaction and receive a response.
615             // We don't care about the contents of the response.
616             cfg_xact.add_hop(cfg_hop);
617             _send_recv_mgmt_transaction(xport, cfg_xact);
618         }
619 
620         // Wait for stream configuration to finish on the HW side
621         _validate_stream_setup(xport, src_node_addr, timeout);
622 
623         UHD_LOG_DEBUG("RFNOC::MGMT",
624             (boost::format("Setup a stream from EPID=%d to EPID=%d") % src_epid
625                 % dst_epid));
626 
627         // Return discovered buffer parameters
628         return std::get<1>(_get_ostrm_status(xport, src_node_addr));
629     }
630 
631 
register_xport_hop_cfg_fns(uint8_t xport_subtype,xport_cfg_fn_t init_hop_cfg_fn,xport_cfg_fn_t rtcfg_hop_cfg_fn)632     virtual void register_xport_hop_cfg_fns(uint8_t xport_subtype,
633         xport_cfg_fn_t init_hop_cfg_fn,
634         xport_cfg_fn_t rtcfg_hop_cfg_fn)
635     {
636         _init_cfg_fns[xport_subtype]  = init_hop_cfg_fn;
637         _rtcfg_cfg_fns[xport_subtype] = rtcfg_hop_cfg_fn;
638     }
639 
640 
641 private: // Functions
642     // Discover all nodes that are reachable from this software stream endpoint
_discover_topology(chdr_ctrl_xport & xport)643     void _discover_topology(chdr_ctrl_xport& xport)
644     {
645         // Initialize a queue of pending paths. We will use this for a breadth-first
646         // traversal of the dataflow graph. The queue consists of a previously discovered
647         // node and the next destination to take from that node.
648         std::queue<std::pair<node_id_t, next_dest_t>> pending_paths;
649         auto my_epid = xport.get_epid();
650 
651         // Add ourselves to the the pending queue to kick off the search
652         UHD_LOG_DEBUG("RFNOC::MGMT",
653             "Starting topology discovery from " << _my_node_id.to_string());
654         bool is_first_path = true;
655         pending_paths.push(std::make_pair(_my_node_id, next_dest_t(-1)));
656 
657         while (not pending_paths.empty()) {
658             // Pop the next path to discover from the pending queue
659             const auto& next_path = pending_paths.front();
660             pending_paths.pop();
661 
662             // We need to build a node_addr_t to allow us to get to next_path
663             // To do so we first lookup how to get to next_path.first. This location has
664             // already been discovered so we should just be able to look it up in
665             // _node_addr_map. The only exception for that is when we are just starting
666             // out, in which case our previous node is "us".
667             node_addr_t next_addr = is_first_path ? node_addr_t()
668                                                   : _node_addr_map.at(next_path.first);
669             // Once we know how to get to the base node, then add the next destination
670             next_addr.push_back(next_path);
671             is_first_path = false;
672 
673             // Build a management transaction to first get to our destination so that we
674             // can ask it to identify itself
675             mgmt_payload route_xact;
676             route_xact.set_header(my_epid, _protover, _chdr_w);
677             _traverse_to_node(route_xact, next_addr);
678 
679             // Discover downstream node (we ask the node to identify itself)
680             mgmt_payload disc_req_xact(route_xact);
681             // Push a node discovery hop
682             mgmt_hop_t disc_hop;
683             disc_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_INFO_REQ));
684             disc_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN));
685             disc_req_xact.add_hop(disc_hop);
686 
687             node_id_t new_node;
688             try {
689                 // Send the discovery transaction
690                 const mgmt_payload disc_resp_xact =
691                     _send_recv_mgmt_transaction(xport, disc_req_xact);
692                 new_node = _pop_node_discovery_hop(disc_resp_xact);
693             } catch (uhd::io_error& io_err) {
694                 // We received an IO error. This could happen if we have a legitimate
695                 // error or if there is no node to discover downstream. We can't tell for
696                 // sure why but we can guess. If the next_path for this node is -1 then we
697                 // expect something to be here, in which case we treat this as a
698                 // legitimate error. In all other cases we assume that there was nothing
699                 // to discover downstream.
700                 if (next_path.second < 0) {
701                     throw io_err;
702                 } else {
703                     // Move to the next pending path
704                     UHD_LOG_TRACE("RFNOC::MGMT",
705                         "Nothing connected on " << next_path.first.to_string() << "->"
706                                                 << next_path.second
707                                                 << ". Ignoring that path.");
708                     continue;
709                 }
710             }
711 
712             // We found a node!
713             // First check if we have already seen this node in the past. If not, we have
714             // to add it to our internal data structures. If we have already seen it then
715             // we just skip it. It is OK to skip the node because we are doing a BFS,
716             // which means that the first time a node is discovered during the traversal,
717             // the distance from this EP to that node will be the shortest path. The core
718             // design philosophy for RFNoC is that the data will always take the shortest
719             // path, because we make the assumption that a shorter path *always* has
720             // better QoS compared to a longer one. If this assumption is not true, we
721             // have to handle ordering by QoS for which we need to modify this search a
722             // bit and provide QoS preferences in the API. That may be a future feature.
723             if (_node_addr_map.count(new_node) > 0) {
724                 UHD_LOG_DEBUG("RFNOC::MGMT",
725                     "Re-discovered node " << new_node.to_string() << ". Skipping it");
726             } else {
727                 UHD_LOG_DEBUG("RFNOC::MGMT", "Discovered node " << new_node.to_string());
728                 _node_addr_map[new_node] = next_addr;
729 
730                 // Initialize the node (first time config)
731                 mgmt_payload init_req_xact(route_xact);
732                 _push_node_init_hop(init_req_xact, new_node, my_epid);
733                 const mgmt_payload init_resp_xact =
734                     _send_recv_mgmt_transaction(xport, init_req_xact);
735                 UHD_LOG_DEBUG("RFNOC::MGMT", "Initialized node " << new_node.to_string());
736 
737                 // If the new node is a stream endpoint then we are done traversing this
738                 // path. If not, then check all ports downstream of the new node and add
739                 // them to pending_paths for further traversal
740                 switch (new_node.type) {
741                     case NODE_TYPE_XBAR: {
742                         // Total ports on this crossbar
743                         size_t nports =
744                             static_cast<size_t>(new_node.extended_info & 0xFF);
745                         // Total transport ports on this crossbar (the first nports_xport
746                         // ports are transport ports)
747                         size_t nports_xport =
748                             static_cast<size_t>((new_node.extended_info >> 8) & 0xFF);
749                         // When we allow daisy chaining, we need to recursively check
750                         // other transports
751                         size_t start_port = ALLOW_DAISY_CHAINING ? 0 : nports_xport;
752                         for (size_t i = start_port; i < nports; i++) {
753                             // Skip the current port because it's the input
754                             if (i != static_cast<size_t>(new_node.inst)) {
755                                 // If there is a single downstream port then do nothing
756                                 pending_paths.push(std::make_pair(
757                                     new_node, static_cast<next_dest_t>(i)));
758                             }
759                         }
760                         UHD_LOG_TRACE("RFNOC::MGMT",
761                             "* " << new_node.to_string() << " has " << nports
762                                  << " ports, " << nports_xport
763                                  << " transports and we are hooked up on port "
764                                  << new_node.inst);
765                     } break;
766                     case NODE_TYPE_STRM_EP: {
767                         // Stop searching when we find a stream endpoint
768                         // Add the endpoint to the discovered endpoint vector
769                         _discovered_ep_set.insert(
770                             sep_addr_t(new_node.device_id, new_node.inst));
771                     } break;
772                     case NODE_TYPE_XPORT: {
773                         // A transport has only one output. We don't need to take
774                         // any action to reach
775                         pending_paths.push(std::make_pair(new_node, -1));
776                     } break;
777                     default: {
778                         UHD_THROW_INVALID_CODE_PATH();
779                         break;
780                     }
781                 }
782             }
783         }
784     }
785 
786     // Add hops to the management transaction to reach the specified node
_traverse_to_node(mgmt_payload & transaction,const node_addr_t & node_addr)787     void _traverse_to_node(mgmt_payload& transaction, const node_addr_t& node_addr)
788     {
789         for (const auto& addr_pair : node_addr) {
790             const node_id_t& curr_node   = addr_pair.first;
791             const next_dest_t& curr_dest = addr_pair.second;
792             if (curr_node.type != NODE_TYPE_STRM_EP) {
793                 // If a node is a crossbar, then it have have a non-negative destination
794                 UHD_ASSERT_THROW((curr_node.type != NODE_TYPE_XBAR || curr_dest >= 0));
795                 _push_advance_hop(transaction, curr_dest);
796             } else {
797                 // This is a stream endpoint. Nothing needs to be done to advance
798                 // here. The behavior of this operation is identical whether or
799                 // not the stream endpoint is in software or not.
800             }
801         }
802     }
803 
804     // Add a hop to the transaction simply to get to the next node
_push_advance_hop(mgmt_payload & transaction,const next_dest_t & next_dst)805     void _push_advance_hop(mgmt_payload& transaction, const next_dest_t& next_dst)
806     {
807         if (next_dst >= 0) {
808             mgmt_hop_t sel_dest_hop;
809             sel_dest_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_SEL_DEST,
810                 mgmt_op_t::sel_dest_payload(static_cast<uint16_t>(next_dst))));
811             transaction.add_hop(sel_dest_hop);
812         } else {
813             mgmt_hop_t nop_hop;
814             nop_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_NOP));
815             transaction.add_hop(nop_hop);
816         }
817     }
818 
819     // Add operations to a hop to configure flow control for an output stream
_push_ostrm_flow_control_config(const bool lossy_xport,const sw_buff_t pyld_buff_fmt,const sw_buff_t mdata_buff_fmt,const bool byte_swap,const stream_buff_params_t & fc_freq,const stream_buff_params_t & fc_headroom,mgmt_hop_t & hop)820     void _push_ostrm_flow_control_config(const bool lossy_xport,
821         const sw_buff_t pyld_buff_fmt,
822         const sw_buff_t mdata_buff_fmt,
823         const bool byte_swap,
824         const stream_buff_params_t& fc_freq,
825         const stream_buff_params_t& fc_headroom,
826         mgmt_hop_t& hop)
827     {
828         // Validate flow control parameters
829         if (fc_freq.bytes > MAX_FC_FREQ_BYTES || fc_freq.packets > MAX_FC_FREQ_PKTS) {
830             throw uhd::value_error("Flow control frequency parameters out of bounds");
831         }
832         if (fc_headroom.bytes > MAX_FC_HEADROOM_BYTES
833             || fc_headroom.packets > MAX_FC_HEADROOM_PKTS) {
834             throw uhd::value_error("Flow control headroom parameters out of bounds");
835         }
836 
837         // Add flow control parameters to hop
838         hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ,
839             mgmt_op_t::cfg_payload(REG_OSTRM_FC_FREQ_BYTES_LO,
840                 static_cast<uint32_t>(fc_freq.bytes & uint64_t(0xFFFFFFFF)))));
841         hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ,
842             mgmt_op_t::cfg_payload(
843                 REG_OSTRM_FC_FREQ_BYTES_HI, static_cast<uint32_t>(fc_freq.bytes >> 32))));
844         hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ,
845             mgmt_op_t::cfg_payload(
846                 REG_OSTRM_FC_FREQ_PKTS, static_cast<uint32_t>(fc_freq.packets))));
847         const uint32_t headroom_reg =
848             (static_cast<uint32_t>(fc_headroom.bytes) & 0xFFFF)
849             | ((static_cast<uint32_t>(fc_headroom.packets) & 0xFF) << 16);
850         hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ,
851             mgmt_op_t::cfg_payload(REG_OSTRM_FC_HEADROOM, headroom_reg)));
852         // Configure buffer types and lossy_xport, then start configuration
853         hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ,
854             mgmt_op_t::cfg_payload(REG_OSTRM_CTRL_STATUS,
855                 BUILD_CTRL_STATUS_WORD(
856                     true, lossy_xport, pyld_buff_fmt, mdata_buff_fmt, byte_swap))));
857     }
858 
859     // Send/recv a management transaction that will get the output stream status
_get_ostrm_status(chdr_ctrl_xport & xport,const node_addr_t & node_addr)860     std::tuple<uint32_t, stream_buff_params_t> _get_ostrm_status(
861         chdr_ctrl_xport& xport, const node_addr_t& node_addr)
862     {
863         auto my_epid = xport.get_epid();
864         // Build a management transaction to first get to the node
865         mgmt_payload status_xact;
866         status_xact.set_header(my_epid, _protover, _chdr_w);
867         _traverse_to_node(status_xact, node_addr);
868 
869         // Read all the status registers
870         mgmt_hop_t cfg_hop;
871         cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_RD_REQ,
872             mgmt_op_t::cfg_payload(REG_OSTRM_CTRL_STATUS)));
873         cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_RD_REQ,
874             mgmt_op_t::cfg_payload(REG_OSTRM_BUFF_CAP_BYTES_LO)));
875         cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_RD_REQ,
876             mgmt_op_t::cfg_payload(REG_OSTRM_BUFF_CAP_BYTES_HI)));
877         cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_RD_REQ,
878             mgmt_op_t::cfg_payload(REG_OSTRM_BUFF_CAP_PKTS)));
879         cfg_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN));
880         status_xact.add_hop(cfg_hop);
881 
882         // Send the transaction, receive a response and validate it
883         const mgmt_payload resp_xact = _send_recv_mgmt_transaction(xport, status_xact);
884         if (resp_xact.get_num_hops() != 1) {
885             throw uhd::op_failed("Management operation failed. Incorrect format (hops).");
886         }
887         const mgmt_hop_t& rhop = resp_xact.get_hop(0);
888         if (rhop.get_num_ops() <= 1
889             || rhop.get_op(0).get_op_code() != mgmt_op_t::MGMT_OP_NOP) {
890             throw uhd::op_failed(
891                 "Management operation failed. Incorrect format (operations).");
892         }
893         for (size_t i = 1; i < rhop.get_num_ops(); i++) {
894             if (rhop.get_op(i).get_op_code() != mgmt_op_t::MGMT_OP_CFG_RD_RESP) {
895                 throw uhd::op_failed(
896                     "Management operation failed. Incorrect format (operations).");
897             }
898         }
899 
900         // Extract peek data from transaction
901         mgmt_op_t::cfg_payload status_pl    = rhop.get_op(1).get_op_payload();
902         mgmt_op_t::cfg_payload cap_bytes_lo = rhop.get_op(2).get_op_payload();
903         mgmt_op_t::cfg_payload cap_bytes_hi = rhop.get_op(3).get_op_payload();
904         mgmt_op_t::cfg_payload cap_pkts     = rhop.get_op(4).get_op_payload();
905 
906         stream_buff_params_t buff_params;
907         buff_params.bytes = static_cast<uint64_t>(cap_bytes_lo.data)
908                             | (static_cast<uint64_t>(cap_bytes_hi.data) << 32);
909         buff_params.packets = static_cast<uint32_t>(cap_pkts.data);
910         return std::make_tuple(status_pl.data, buff_params);
911     }
912 
913     // Make sure that stream setup is complete and successful, else throw exception
_validate_stream_setup(chdr_ctrl_xport & xport,const node_addr_t & node_addr,const double timeout)914     void _validate_stream_setup(
915         chdr_ctrl_xport& xport, const node_addr_t& node_addr, const double timeout)
916     {
917         // Get the status of the output stream
918         uint32_t ostrm_status = 0;
919         double sleep_s        = 0.05;
920         for (size_t i = 0; i < size_t(std::ceil(timeout / sleep_s)); i++) {
921             ostrm_status = std::get<0>(_get_ostrm_status(xport, node_addr));
922             if ((ostrm_status & STRM_STATUS_SETUP_PENDING) != 0) {
923                 // Wait and retry
924                 std::chrono::milliseconds(static_cast<int64_t>(sleep_s * 1000));
925             } else {
926                 // Configuration is done
927                 break;
928             }
929         }
930 
931         if ((ostrm_status & STRM_STATUS_SETUP_PENDING) != 0) {
932             throw uhd::op_timeout("config_stream: Operation timed out");
933         }
934         if ((ostrm_status & STRM_STATUS_SETUP_ERR) != 0) {
935             throw uhd::op_failed("config_stream: Setup failure");
936         }
937         if ((ostrm_status & STRM_STATUS_FC_ENABLED) == 0) {
938             throw uhd::op_failed("config_stream: Flow control negotiation failed");
939         }
940     }
941 
942 
943     // Pop a node discovery response from a transaction and parse it
_pop_node_discovery_hop(const mgmt_payload & transaction)944     const node_id_t _pop_node_discovery_hop(const mgmt_payload& transaction)
945     {
946         if (transaction.get_num_hops() != 1) {
947             throw uhd::op_failed("Management operation failed. Incorrect format (hops).");
948         }
949         const mgmt_hop_t& rhop     = transaction.get_hop(0);
950         const mgmt_op_t& nop_resp  = rhop.get_op(0);
951         const mgmt_op_t& info_resp = rhop.get_op(1);
952         if (rhop.get_num_ops() <= 1 || nop_resp.get_op_code() != mgmt_op_t::MGMT_OP_NOP
953             || info_resp.get_op_code() != mgmt_op_t::MGMT_OP_INFO_RESP) {
954             throw uhd::op_failed(
955                 "Management operation failed. Incorrect format (operations).");
956         }
957         mgmt_op_t::node_info_payload resp_pl(info_resp.get_op_payload());
958         return std::move(node_id_t(resp_pl.device_id,
959             static_cast<node_type>(resp_pl.node_type),
960             resp_pl.node_inst,
961             resp_pl.ext_info));
962     }
963 
964     // Push a hop onto a transaction to initialize the current node
_push_node_init_hop(mgmt_payload & transaction,const node_id_t & node,const sep_id_t & my_epid)965     void _push_node_init_hop(
966         mgmt_payload& transaction, const node_id_t& node, const sep_id_t& my_epid)
967     {
968         mgmt_hop_t init_hop;
969         switch (node.type) {
970             case NODE_TYPE_XBAR: {
971                 // Configure the routing table to route all packets going to my_epid back
972                 // to the port where the packet is entering
973                 // The address for the transaction is the EPID and the data is the port #
974                 init_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_CFG_WR_REQ,
975                     mgmt_op_t::cfg_payload(my_epid, node.inst)));
976             } break;
977             case NODE_TYPE_STRM_EP: {
978                 // Do nothing
979                 init_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_NOP));
980             } break;
981             case NODE_TYPE_XPORT: {
982                 uint8_t node_subtype = static_cast<uint8_t>(node.extended_info & 0xFF);
983                 // Run a hop configuration function for custom transports
984                 if (_rtcfg_cfg_fns.count(node_subtype)) {
985                     _rtcfg_cfg_fns.at(node_subtype)(
986                         node.device_id, node.inst, node_subtype, init_hop);
987                 } else {
988                     // For a generic transport, just advertise the transaction to the
989                     // outside world. The generic xport adapter will do the rest
990                     init_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_ADVERTISE));
991                 }
992             } break;
993             default: {
994                 UHD_THROW_INVALID_CODE_PATH();
995             } break;
996         }
997         init_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN));
998         transaction.add_hop(init_hop);
999     }
1000 
1001     // Lookup the full address of a stream endpoint node given the EPID
_lookup_sep_node_addr(const sep_id_t & epid)1002     const node_addr_t& _lookup_sep_node_addr(const sep_id_t& epid)
1003     {
1004         // Lookup the destination node address using the endpoint ID
1005         if (_epid_addr_map.count(epid) == 0) {
1006             throw uhd::lookup_error(
1007                 "Could not find a stream endpoint with the requested ID.");
1008         }
1009         node_id_t sep_node(_epid_addr_map.at(epid));
1010         // If a node is in _epid_addr_map then it must be in _node_addr_map
1011         UHD_ASSERT_THROW(_node_addr_map.count(sep_node) > 0);
1012         return _node_addr_map.at(sep_node);
1013     }
1014 
1015     // Send the specified management transaction to the device
_send_mgmt_transaction(chdr_ctrl_xport & xport,const mgmt_payload & payload,double timeout=0.1)1016     void _send_mgmt_transaction(
1017         chdr_ctrl_xport& xport, const mgmt_payload& payload, double timeout = 0.1)
1018     {
1019         chdr_header header;
1020         header.set_pkt_type(PKT_TYPE_MGMT);
1021         header.set_num_mdata(0);
1022         header.set_seq_num(_send_seqnum++);
1023         header.set_length(payload.get_size_bytes() + (chdr_w_to_bits(_chdr_w) / 8));
1024         header.set_dst_epid(0);
1025 
1026         auto send_buff = xport.get_send_buff(timeout * 1000);
1027         if (not send_buff) {
1028             UHD_LOG_ERROR(
1029                 "RFNOC::MGMT", "Timed out getting send buff for management transaction");
1030             throw uhd::io_error("Timed out getting send buff for management transaction");
1031         }
1032         _send_pkt->refresh(send_buff->data(), header, payload);
1033         send_buff->set_packet_size(header.get_length());
1034         xport.release_send_buff(std::move(send_buff));
1035     }
1036 
1037     // Send the specified management transaction to the device and receive a response
_send_recv_mgmt_transaction(chdr_ctrl_xport & xport,const mgmt_payload & transaction,double timeout=0.1)1038     const mgmt_payload _send_recv_mgmt_transaction(
1039         chdr_ctrl_xport& xport, const mgmt_payload& transaction, double timeout = 0.1)
1040     {
1041         auto my_epid = xport.get_epid();
1042         mgmt_payload send(transaction);
1043         send.set_header(my_epid, _protover, _chdr_w);
1044         // If we are expecting to receive a response then we have to add an additional
1045         // NO-OP hop for the receive endpoint. All responses will be appended to this hop.
1046         mgmt_hop_t nop_hop;
1047         nop_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_NOP));
1048         send.add_hop(nop_hop);
1049         // Send the transaction over the wire
1050         _send_mgmt_transaction(xport, send);
1051 
1052         auto mgmt_buff = xport.get_mgmt_buff(timeout * 1000);
1053         if (not mgmt_buff) {
1054             throw uhd::io_error("Timed out getting recv buff for management transaction");
1055         }
1056         _recv_pkt->refresh(mgmt_buff->data());
1057         mgmt_payload recv;
1058         recv.set_header(my_epid, _protover, _chdr_w);
1059         _recv_pkt->fill_payload(recv);
1060         xport.release_mgmt_buff(std::move(mgmt_buff));
1061         return recv;
1062     }
1063 
1064 private: // Members
1065     // The software RFNoC protocol version
1066     const uint16_t _protover;
1067     // CHDR Width for this design/application
1068     const chdr_w_t _chdr_w;
1069     // Endianness for the transport
1070     const endianness_t _endianness;
1071     // The node ID for this software endpoint
1072     const node_id_t _my_node_id;
1073     // A table that maps a node_id_t to a node_addr_t. This map allows looking up the
1074     // address of a node given the node ID. There may be multiple ways to get to the
1075     // node but we only store the shortest path here.
1076     std::map<node_id_t, node_addr_t> _node_addr_map;
1077     // A list of all discovered endpoints
1078     std::set<sep_addr_t> _discovered_ep_set;
1079     // A table that maps a stream endpoint ID to the physical address of the stream
1080     // endpoint. This is a cache of the values from the epid_allocator
1081     std::map<sep_id_t, sep_addr_t> _epid_addr_map;
1082     // Send/recv transports
1083     size_t _send_seqnum;
1084     // Management packet containers
1085     chdr_mgmt_packet::uptr _send_pkt;
1086     chdr_mgmt_packet::cuptr _recv_pkt;
1087     // Hop configuration function maps
1088     std::map<uint8_t, xport_cfg_fn_t> _init_cfg_fns;
1089     std::map<uint8_t, xport_cfg_fn_t> _rtcfg_cfg_fns;
1090     // Mutex that protects all state in this class
1091     mutable std::recursive_mutex _mutex;
1092 }; // namespace mgmt
1093 
1094 
make(chdr_ctrl_xport & xport,const chdr::chdr_packet_factory & pkt_factory,sep_addr_t my_sep_addr)1095 mgmt_portal::uptr mgmt_portal::make(chdr_ctrl_xport& xport,
1096     const chdr::chdr_packet_factory& pkt_factory,
1097     sep_addr_t my_sep_addr)
1098 {
1099     return std::make_unique<mgmt_portal_impl>(xport, pkt_factory, my_sep_addr);
1100 }
1101 
1102 }}} // namespace uhd::rfnoc::mgmt
1103