1 //
2 // Copyright 2019 Ettus Research, a National Instruments Brand
3 //
4 // SPDX-License-Identifier: GPL-3.0-or-later
5 //
6 
7 #include <uhd/exception.hpp>
8 #include <uhd/utils/log.hpp>
9 #include <uhdlib/rfnoc/graph_stream_manager.hpp>
10 #include <uhdlib/rfnoc/link_stream_manager.hpp>
11 #include <uhdlib/transport/links.hpp>
12 #include <boost/format.hpp>
13 #include <map>
14 #include <memory>
15 #include <set>
16 
17 using namespace uhd;
18 using namespace uhd::rfnoc;
19 using namespace uhd::rfnoc::chdr;
20 
21 graph_stream_manager::~graph_stream_manager() = default;
22 
23 class graph_stream_manager_impl : public graph_stream_manager
24 {
25 public:
graph_stream_manager_impl(const chdr::chdr_packet_factory & pkt_factory,const epid_allocator::sptr & epid_alloc,const std::vector<std::pair<device_id_t,mb_iface * >> & links)26     graph_stream_manager_impl(const chdr::chdr_packet_factory& pkt_factory,
27         const epid_allocator::sptr& epid_alloc,
28         const std::vector<std::pair<device_id_t, mb_iface*>>& links)
29         : _epid_alloc(epid_alloc)
30     {
31         for (const auto& lnk : links) {
32             UHD_ASSERT_THROW(lnk.second);
33             _link_mgrs.emplace(lnk.first,
34                 std::move(link_stream_manager::make(
35                     pkt_factory, *lnk.second, epid_alloc, lnk.first)));
36             auto adapter = _link_mgrs.at(lnk.first)->get_adapter_id();
37             if (_alloc_map.count(adapter) == 0) {
38                 _alloc_map[adapter] = allocation_info{0, 0};
39             }
40         }
41         for (const auto& mgr_pair : _link_mgrs) {
42             for (const auto& ep : mgr_pair.second->get_reachable_endpoints()) {
43                 // Add the (potential) destinations to the
44                 _reachable_endpoints.insert(ep);
45                 // Add entry to source map
46                 if (_src_map.count(ep) == 0) {
47                     _src_map[ep] = std::vector<device_id_t>();
48                 }
49                 _src_map[ep].push_back(mgr_pair.first);
50             }
51         }
52     }
53 
54     virtual ~graph_stream_manager_impl() = default;
55 
get_reachable_endpoints() const56     virtual const std::set<sep_addr_t>& get_reachable_endpoints() const
57     {
58         return _reachable_endpoints;
59     }
60 
get_local_devices() const61     virtual std::vector<device_id_t> get_local_devices() const
62     {
63         std::vector<device_id_t> retval;
64         for (const auto& mgr_pair : _link_mgrs) {
65             retval.push_back(mgr_pair.first);
66         }
67         return retval;
68     }
69 
connect_host_to_device(sep_addr_t dst_addr,uhd::transport::adapter_id_t adapter=uhd::transport::NULL_ADAPTER_ID)70     virtual sep_id_pair_t connect_host_to_device(sep_addr_t dst_addr,
71         uhd::transport::adapter_id_t adapter = uhd::transport::NULL_ADAPTER_ID)
72     {
73         UHD_LOGGER_DEBUG("RFNOC::GRAPH")
74             << boost::format("Connecting the Host to Endpoint %d:%d through Adapter "
75                              "%d (0 = no preference)... ")
76                    % dst_addr.first % dst_addr.second % adapter;
77 
78         // When we connect, we setup a route and fire up a control stream between
79         // the endpoints
80         device_id_t gateway =
81             _check_dst_and_find_src(dst_addr, adapter, uhd::transport::link_type_t::CTRL);
82         sep_id_pair_t epid_pair =
83             _link_mgrs.at(gateway)->connect_host_to_device(dst_addr);
84         UHD_LOGGER_DEBUG("RFNOC::GRAPH")
85             << boost::format("Connection to Endpoint %d:%d completed through Device %d. "
86                              "Using EPIDs %d -> %d.")
87                    % dst_addr.first % dst_addr.second % gateway % epid_pair.first
88                    % epid_pair.second;
89 
90         return epid_pair;
91     }
92 
connect_device_to_device(sep_addr_t dst_addr,sep_addr_t src_addr)93     virtual sep_id_pair_t connect_device_to_device(
94         sep_addr_t dst_addr, sep_addr_t src_addr)
95     {
96         UHD_LOGGER_DEBUG("RFNOC::GRAPH")
97             << boost::format("Connecting the Endpoint %d:%d to Endpoint %d:%d...")
98                    % src_addr.first % src_addr.second % dst_addr.first % dst_addr.second;
99 
100         // Iterate through all link managers and check if they are capable of connecting
101         // the requested endpoints. If no one can connect then the endpoints may actually
102         // not share a common crossbar or we don't have enough connectivity in the
103         // software session to reach the common crossbar.
104         for (auto& kv : _link_mgrs) {
105             if (kv.second->can_connect_device_to_device(dst_addr, src_addr)) {
106                 sep_id_pair_t epid_pair =
107                     kv.second->connect_device_to_device(dst_addr, src_addr);
108                 UHD_LOGGER_DEBUG("RFNOC::GRAPH")
109                     << boost::format("Connection from Endpoint %d:%d to Endpoint %d:%d "
110                                      "completed through Device %d. Using "
111                                      "EPIDs %d -> %d.")
112                            % src_addr.first % src_addr.second % dst_addr.first
113                            % dst_addr.second % kv.first % epid_pair.first
114                            % epid_pair.second;
115                 return epid_pair;
116             }
117         }
118         throw uhd::routing_error("The specified destination is unreachable from the "
119                                  "specified source endpoint");
120     }
121 
get_block_register_iface(sep_addr_t dst_addr,uint16_t block_index,const clock_iface & client_clk,const clock_iface & timebase_clk,uhd::transport::adapter_id_t adapter=uhd::transport::NULL_ADAPTER_ID)122     virtual ctrlport_endpoint::sptr get_block_register_iface(sep_addr_t dst_addr,
123         uint16_t block_index,
124         const clock_iface& client_clk,
125         const clock_iface& timebase_clk,
126         uhd::transport::adapter_id_t adapter = uhd::transport::NULL_ADAPTER_ID)
127     {
128         // We must be connected to dst_addr before getting a register iface
129         sep_id_t dst_epid = _epid_alloc->get_epid(dst_addr);
130         auto dev =
131             _check_dst_and_find_src(dst_addr, adapter, uhd::transport::link_type_t::CTRL);
132         return _link_mgrs.at(dev)->get_block_register_iface(
133             dst_epid, block_index, client_clk, timebase_clk);
134     }
135 
get_client_zero(sep_addr_t dst_addr,uhd::transport::adapter_id_t adapter=uhd::transport::NULL_ADAPTER_ID) const136     virtual detail::client_zero::sptr get_client_zero(sep_addr_t dst_addr,
137         uhd::transport::adapter_id_t adapter = uhd::transport::NULL_ADAPTER_ID) const
138     {
139         // We must be connected to dst_addr before getting a client zero
140         sep_id_t dst_epid = _epid_alloc->get_epid(dst_addr);
141         auto dev =
142             _check_dst_and_find_src(dst_addr, adapter, uhd::transport::link_type_t::CTRL);
143         return _link_mgrs.at(dev)->get_client_zero(dst_epid);
144     }
145 
146     virtual std::tuple<sep_id_pair_t, stream_buff_params_t>
create_device_to_device_data_stream(const sep_addr_t dst_addr,const sep_addr_t src_addr,const bool lossy_xport,const double fc_freq_ratio,const double fc_headroom_ratio,const bool reset=false)147     create_device_to_device_data_stream(const sep_addr_t dst_addr,
148         const sep_addr_t src_addr,
149         const bool lossy_xport,
150         const double fc_freq_ratio,
151         const double fc_headroom_ratio,
152         const bool reset = false)
153     {
154         UHD_LOGGER_DEBUG("RFNOC::GRAPH")
155             << boost::format(
156                    "Initializing data stream from Endpoint %d:%d to Endpoint %d:%d...")
157                    % src_addr.first % src_addr.second % dst_addr.first % dst_addr.second;
158 
159         // Iterate through all link managers and check if they are capable of connecting
160         // the requested endpoints. If no one can connect then the endpoints may actually
161         // not share a common crossbar or we don't have enough connectivity in the
162         // software session to reach the common crossbar.
163         for (auto& kv : _link_mgrs) {
164             if (kv.second->can_connect_device_to_device(dst_addr, src_addr)) {
165                 sep_id_pair_t epid_pair =
166                     kv.second->connect_device_to_device(dst_addr, src_addr);
167                 UHD_LOGGER_DEBUG("RFNOC::GRAPH")
168                     << boost::format("Connection from Endpoint %d:%d to Endpoint %d:%d "
169                                      "completed through Device %d. Using "
170                                      "EPIDs %d -> %d.")
171                            % src_addr.first % src_addr.second % dst_addr.first
172                            % dst_addr.second % kv.first % epid_pair.first
173                            % epid_pair.second;
174                 stream_buff_params_t buff_params =
175                     kv.second->create_device_to_device_data_stream(epid_pair.second,
176                         epid_pair.first,
177                         lossy_xport,
178                         fc_freq_ratio,
179                         fc_headroom_ratio,
180                         reset);
181                 return std::make_tuple(epid_pair, buff_params);
182             }
183         }
184         throw uhd::routing_error("The specified destination is unreachable from the "
185                                  "specified source endpoint");
186     }
187 
create_device_to_host_data_stream(const sep_addr_t src_addr,const sw_buff_t pyld_buff_fmt,const sw_buff_t mdata_buff_fmt,const uhd::transport::adapter_id_t adapter,const device_addr_t & xport_args,const std::string & streamer_id)188     chdr_rx_data_xport::uptr create_device_to_host_data_stream(const sep_addr_t src_addr,
189         const sw_buff_t pyld_buff_fmt,
190         const sw_buff_t mdata_buff_fmt,
191         const uhd::transport::adapter_id_t adapter,
192         const device_addr_t& xport_args,
193         const std::string& streamer_id)
194     {
195         device_id_t dev = _check_dst_and_find_src(
196             src_addr, adapter, uhd::transport::link_type_t::RX_DATA);
197         uhd::transport::adapter_id_t chosen = _link_mgrs.at(dev)->get_adapter_id();
198         auto allocs                         = _alloc_map.at(chosen);
199         allocs.rx++;
200         _alloc_map[chosen] = allocs;
201         return _link_mgrs.at(dev)->create_device_to_host_data_stream(
202             src_addr, pyld_buff_fmt, mdata_buff_fmt, xport_args, streamer_id);
203     }
204 
create_host_to_device_data_stream(sep_addr_t dst_addr,const sw_buff_t pyld_buff_fmt,const sw_buff_t mdata_buff_fmt,const uhd::transport::adapter_id_t adapter,const device_addr_t & xport_args,const std::string & streamer_id)205     virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream(
206         sep_addr_t dst_addr,
207         const sw_buff_t pyld_buff_fmt,
208         const sw_buff_t mdata_buff_fmt,
209         const uhd::transport::adapter_id_t adapter,
210         const device_addr_t& xport_args,
211         const std::string& streamer_id)
212     {
213         device_id_t dev = _check_dst_and_find_src(
214             dst_addr, adapter, uhd::transport::link_type_t::TX_DATA);
215         uhd::transport::adapter_id_t chosen = _link_mgrs.at(dev)->get_adapter_id();
216         auto allocs                         = _alloc_map.at(chosen);
217         allocs.tx++;
218         _alloc_map[chosen] = allocs;
219         return _link_mgrs.at(dev)->create_host_to_device_data_stream(
220             dst_addr, pyld_buff_fmt, mdata_buff_fmt, xport_args, streamer_id);
221     }
222 
get_adapters(sep_addr_t addr) const223     std::vector<uhd::transport::adapter_id_t> get_adapters(sep_addr_t addr) const
224     {
225         auto adapters = std::vector<uhd::transport::adapter_id_t>();
226         if (_src_map.count(addr) > 0) {
227             const auto& src_devs = _src_map.at(addr);
228             for (auto src : src_devs) {
229                 // Returns in order specified in device args
230                 // Assumption: the device_id_t will be incremented sequentially
231                 // and the std::map will then provide the link_stream_managers
232                 // in the same order as the adapters were specified
233                 adapters.push_back(_link_mgrs.at(src)->get_adapter_id());
234             }
235             return adapters;
236         } else {
237             throw uhd::rfnoc_error("Specified address is unreachable. No via_devices.");
238         }
239     }
240 
241 private:
_check_dst_and_find_src(sep_addr_t dst_addr,uhd::transport::adapter_id_t adapter,uhd::transport::link_type_t link_type) const242     device_id_t _check_dst_and_find_src(sep_addr_t dst_addr,
243         uhd::transport::adapter_id_t adapter,
244         uhd::transport::link_type_t link_type) const
245     {
246         if (_src_map.count(dst_addr) > 0) {
247             const auto& src_devs = _src_map.at(dst_addr);
248             if (adapter == uhd::transport::NULL_ADAPTER_ID) {
249                 // TODO: Maybe we can come up with a better heuristic for when the user
250                 // gives no preference
251                 auto dev       = src_devs[0];
252                 auto dev_alloc = _alloc_map.at(_link_mgrs.at(dev)->get_adapter_id());
253                 for (auto candidate : src_devs) {
254                     auto candidate_alloc =
255                         _alloc_map.at(_link_mgrs.at(candidate)->get_adapter_id());
256                     switch (link_type) {
257                         case uhd::transport::link_type_t::TX_DATA:
258                             if (candidate_alloc.tx < dev_alloc.tx) {
259                                 dev       = candidate;
260                                 dev_alloc = candidate_alloc;
261                             }
262                             break;
263                         case uhd::transport::link_type_t::RX_DATA:
264                             if (candidate_alloc.rx < dev_alloc.rx) {
265                                 dev       = candidate;
266                                 dev_alloc = candidate_alloc;
267                             }
268                             break;
269                         default:
270                             // Just accept first device for CTRL and ASYNC_MSG
271                             break;
272                     }
273                 }
274                 return dev;
275             } else {
276                 for (const auto& src : src_devs) {
277                     if (_link_mgrs.at(src)->get_adapter_id() == adapter) {
278                         return src;
279                     }
280                 }
281                 throw uhd::rfnoc_error("Specified destination address is unreachable "
282                                        "from the via device");
283             }
284         } else {
285             throw uhd::rfnoc_error("Specified destination address is unreachable");
286         }
287     }
288 
289     // The cached EPID allocator object
290     epid_allocator::sptr _epid_alloc;
291     // A map the contains all link manager indexed by the device ID
292     std::map<device_id_t, link_stream_manager::uptr> _link_mgrs;
293     // A set of the addresses of all devices reachable from this graph
294     std::set<sep_addr_t> _reachable_endpoints;
295     // A map of addresses that can be taken to reach a particular destination
296     std::map<sep_addr_t, std::vector<device_id_t>> _src_map;
297 
298     // Data used for heuristic to determine which link to use
299     struct allocation_info
300     {
301         size_t rx;
302         size_t tx;
303     };
304 
305     // A map of allocations for each host transport adapter
306     std::map<uhd::transport::adapter_id_t, allocation_info> _alloc_map;
307 };
308 
make(const chdr::chdr_packet_factory & pkt_factory,const epid_allocator::sptr & epid_alloc,const std::vector<std::pair<device_id_t,mb_iface * >> & links)309 graph_stream_manager::uptr graph_stream_manager::make(
310     const chdr::chdr_packet_factory& pkt_factory,
311     const epid_allocator::sptr& epid_alloc,
312     const std::vector<std::pair<device_id_t, mb_iface*>>& links)
313 {
314     return std::make_unique<graph_stream_manager_impl>(pkt_factory, epid_alloc, links);
315 }
316