1 //
2 // Copyright 2019 Ettus Research, a National Instruments Brand
3 //
4 // SPDX-License-Identifier: GPL-3.0-or-later
5 //
6
7 #include <uhd/exception.hpp>
8 #include <uhd/utils/log.hpp>
9 #include <uhdlib/rfnoc/graph_stream_manager.hpp>
10 #include <uhdlib/rfnoc/link_stream_manager.hpp>
11 #include <uhdlib/transport/links.hpp>
12 #include <boost/format.hpp>
13 #include <map>
14 #include <memory>
15 #include <set>
16
17 using namespace uhd;
18 using namespace uhd::rfnoc;
19 using namespace uhd::rfnoc::chdr;
20
21 graph_stream_manager::~graph_stream_manager() = default;
22
23 class graph_stream_manager_impl : public graph_stream_manager
24 {
25 public:
graph_stream_manager_impl(const chdr::chdr_packet_factory & pkt_factory,const epid_allocator::sptr & epid_alloc,const std::vector<std::pair<device_id_t,mb_iface * >> & links)26 graph_stream_manager_impl(const chdr::chdr_packet_factory& pkt_factory,
27 const epid_allocator::sptr& epid_alloc,
28 const std::vector<std::pair<device_id_t, mb_iface*>>& links)
29 : _epid_alloc(epid_alloc)
30 {
31 for (const auto& lnk : links) {
32 UHD_ASSERT_THROW(lnk.second);
33 _link_mgrs.emplace(lnk.first,
34 std::move(link_stream_manager::make(
35 pkt_factory, *lnk.second, epid_alloc, lnk.first)));
36 auto adapter = _link_mgrs.at(lnk.first)->get_adapter_id();
37 if (_alloc_map.count(adapter) == 0) {
38 _alloc_map[adapter] = allocation_info{0, 0};
39 }
40 }
41 for (const auto& mgr_pair : _link_mgrs) {
42 for (const auto& ep : mgr_pair.second->get_reachable_endpoints()) {
43 // Add the (potential) destinations to the
44 _reachable_endpoints.insert(ep);
45 // Add entry to source map
46 if (_src_map.count(ep) == 0) {
47 _src_map[ep] = std::vector<device_id_t>();
48 }
49 _src_map[ep].push_back(mgr_pair.first);
50 }
51 }
52 }
53
54 virtual ~graph_stream_manager_impl() = default;
55
get_reachable_endpoints() const56 virtual const std::set<sep_addr_t>& get_reachable_endpoints() const
57 {
58 return _reachable_endpoints;
59 }
60
get_local_devices() const61 virtual std::vector<device_id_t> get_local_devices() const
62 {
63 std::vector<device_id_t> retval;
64 for (const auto& mgr_pair : _link_mgrs) {
65 retval.push_back(mgr_pair.first);
66 }
67 return retval;
68 }
69
connect_host_to_device(sep_addr_t dst_addr,uhd::transport::adapter_id_t adapter=uhd::transport::NULL_ADAPTER_ID)70 virtual sep_id_pair_t connect_host_to_device(sep_addr_t dst_addr,
71 uhd::transport::adapter_id_t adapter = uhd::transport::NULL_ADAPTER_ID)
72 {
73 UHD_LOGGER_DEBUG("RFNOC::GRAPH")
74 << boost::format("Connecting the Host to Endpoint %d:%d through Adapter "
75 "%d (0 = no preference)... ")
76 % dst_addr.first % dst_addr.second % adapter;
77
78 // When we connect, we setup a route and fire up a control stream between
79 // the endpoints
80 device_id_t gateway =
81 _check_dst_and_find_src(dst_addr, adapter, uhd::transport::link_type_t::CTRL);
82 sep_id_pair_t epid_pair =
83 _link_mgrs.at(gateway)->connect_host_to_device(dst_addr);
84 UHD_LOGGER_DEBUG("RFNOC::GRAPH")
85 << boost::format("Connection to Endpoint %d:%d completed through Device %d. "
86 "Using EPIDs %d -> %d.")
87 % dst_addr.first % dst_addr.second % gateway % epid_pair.first
88 % epid_pair.second;
89
90 return epid_pair;
91 }
92
connect_device_to_device(sep_addr_t dst_addr,sep_addr_t src_addr)93 virtual sep_id_pair_t connect_device_to_device(
94 sep_addr_t dst_addr, sep_addr_t src_addr)
95 {
96 UHD_LOGGER_DEBUG("RFNOC::GRAPH")
97 << boost::format("Connecting the Endpoint %d:%d to Endpoint %d:%d...")
98 % src_addr.first % src_addr.second % dst_addr.first % dst_addr.second;
99
100 // Iterate through all link managers and check if they are capable of connecting
101 // the requested endpoints. If no one can connect then the endpoints may actually
102 // not share a common crossbar or we don't have enough connectivity in the
103 // software session to reach the common crossbar.
104 for (auto& kv : _link_mgrs) {
105 if (kv.second->can_connect_device_to_device(dst_addr, src_addr)) {
106 sep_id_pair_t epid_pair =
107 kv.second->connect_device_to_device(dst_addr, src_addr);
108 UHD_LOGGER_DEBUG("RFNOC::GRAPH")
109 << boost::format("Connection from Endpoint %d:%d to Endpoint %d:%d "
110 "completed through Device %d. Using "
111 "EPIDs %d -> %d.")
112 % src_addr.first % src_addr.second % dst_addr.first
113 % dst_addr.second % kv.first % epid_pair.first
114 % epid_pair.second;
115 return epid_pair;
116 }
117 }
118 throw uhd::routing_error("The specified destination is unreachable from the "
119 "specified source endpoint");
120 }
121
get_block_register_iface(sep_addr_t dst_addr,uint16_t block_index,const clock_iface & client_clk,const clock_iface & timebase_clk,uhd::transport::adapter_id_t adapter=uhd::transport::NULL_ADAPTER_ID)122 virtual ctrlport_endpoint::sptr get_block_register_iface(sep_addr_t dst_addr,
123 uint16_t block_index,
124 const clock_iface& client_clk,
125 const clock_iface& timebase_clk,
126 uhd::transport::adapter_id_t adapter = uhd::transport::NULL_ADAPTER_ID)
127 {
128 // We must be connected to dst_addr before getting a register iface
129 sep_id_t dst_epid = _epid_alloc->get_epid(dst_addr);
130 auto dev =
131 _check_dst_and_find_src(dst_addr, adapter, uhd::transport::link_type_t::CTRL);
132 return _link_mgrs.at(dev)->get_block_register_iface(
133 dst_epid, block_index, client_clk, timebase_clk);
134 }
135
get_client_zero(sep_addr_t dst_addr,uhd::transport::adapter_id_t adapter=uhd::transport::NULL_ADAPTER_ID) const136 virtual detail::client_zero::sptr get_client_zero(sep_addr_t dst_addr,
137 uhd::transport::adapter_id_t adapter = uhd::transport::NULL_ADAPTER_ID) const
138 {
139 // We must be connected to dst_addr before getting a client zero
140 sep_id_t dst_epid = _epid_alloc->get_epid(dst_addr);
141 auto dev =
142 _check_dst_and_find_src(dst_addr, adapter, uhd::transport::link_type_t::CTRL);
143 return _link_mgrs.at(dev)->get_client_zero(dst_epid);
144 }
145
146 virtual std::tuple<sep_id_pair_t, stream_buff_params_t>
create_device_to_device_data_stream(const sep_addr_t dst_addr,const sep_addr_t src_addr,const bool lossy_xport,const double fc_freq_ratio,const double fc_headroom_ratio,const bool reset=false)147 create_device_to_device_data_stream(const sep_addr_t dst_addr,
148 const sep_addr_t src_addr,
149 const bool lossy_xport,
150 const double fc_freq_ratio,
151 const double fc_headroom_ratio,
152 const bool reset = false)
153 {
154 UHD_LOGGER_DEBUG("RFNOC::GRAPH")
155 << boost::format(
156 "Initializing data stream from Endpoint %d:%d to Endpoint %d:%d...")
157 % src_addr.first % src_addr.second % dst_addr.first % dst_addr.second;
158
159 // Iterate through all link managers and check if they are capable of connecting
160 // the requested endpoints. If no one can connect then the endpoints may actually
161 // not share a common crossbar or we don't have enough connectivity in the
162 // software session to reach the common crossbar.
163 for (auto& kv : _link_mgrs) {
164 if (kv.second->can_connect_device_to_device(dst_addr, src_addr)) {
165 sep_id_pair_t epid_pair =
166 kv.second->connect_device_to_device(dst_addr, src_addr);
167 UHD_LOGGER_DEBUG("RFNOC::GRAPH")
168 << boost::format("Connection from Endpoint %d:%d to Endpoint %d:%d "
169 "completed through Device %d. Using "
170 "EPIDs %d -> %d.")
171 % src_addr.first % src_addr.second % dst_addr.first
172 % dst_addr.second % kv.first % epid_pair.first
173 % epid_pair.second;
174 stream_buff_params_t buff_params =
175 kv.second->create_device_to_device_data_stream(epid_pair.second,
176 epid_pair.first,
177 lossy_xport,
178 fc_freq_ratio,
179 fc_headroom_ratio,
180 reset);
181 return std::make_tuple(epid_pair, buff_params);
182 }
183 }
184 throw uhd::routing_error("The specified destination is unreachable from the "
185 "specified source endpoint");
186 }
187
create_device_to_host_data_stream(const sep_addr_t src_addr,const sw_buff_t pyld_buff_fmt,const sw_buff_t mdata_buff_fmt,const uhd::transport::adapter_id_t adapter,const device_addr_t & xport_args,const std::string & streamer_id)188 chdr_rx_data_xport::uptr create_device_to_host_data_stream(const sep_addr_t src_addr,
189 const sw_buff_t pyld_buff_fmt,
190 const sw_buff_t mdata_buff_fmt,
191 const uhd::transport::adapter_id_t adapter,
192 const device_addr_t& xport_args,
193 const std::string& streamer_id)
194 {
195 device_id_t dev = _check_dst_and_find_src(
196 src_addr, adapter, uhd::transport::link_type_t::RX_DATA);
197 uhd::transport::adapter_id_t chosen = _link_mgrs.at(dev)->get_adapter_id();
198 auto allocs = _alloc_map.at(chosen);
199 allocs.rx++;
200 _alloc_map[chosen] = allocs;
201 return _link_mgrs.at(dev)->create_device_to_host_data_stream(
202 src_addr, pyld_buff_fmt, mdata_buff_fmt, xport_args, streamer_id);
203 }
204
create_host_to_device_data_stream(sep_addr_t dst_addr,const sw_buff_t pyld_buff_fmt,const sw_buff_t mdata_buff_fmt,const uhd::transport::adapter_id_t adapter,const device_addr_t & xport_args,const std::string & streamer_id)205 virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream(
206 sep_addr_t dst_addr,
207 const sw_buff_t pyld_buff_fmt,
208 const sw_buff_t mdata_buff_fmt,
209 const uhd::transport::adapter_id_t adapter,
210 const device_addr_t& xport_args,
211 const std::string& streamer_id)
212 {
213 device_id_t dev = _check_dst_and_find_src(
214 dst_addr, adapter, uhd::transport::link_type_t::TX_DATA);
215 uhd::transport::adapter_id_t chosen = _link_mgrs.at(dev)->get_adapter_id();
216 auto allocs = _alloc_map.at(chosen);
217 allocs.tx++;
218 _alloc_map[chosen] = allocs;
219 return _link_mgrs.at(dev)->create_host_to_device_data_stream(
220 dst_addr, pyld_buff_fmt, mdata_buff_fmt, xport_args, streamer_id);
221 }
222
get_adapters(sep_addr_t addr) const223 std::vector<uhd::transport::adapter_id_t> get_adapters(sep_addr_t addr) const
224 {
225 auto adapters = std::vector<uhd::transport::adapter_id_t>();
226 if (_src_map.count(addr) > 0) {
227 const auto& src_devs = _src_map.at(addr);
228 for (auto src : src_devs) {
229 // Returns in order specified in device args
230 // Assumption: the device_id_t will be incremented sequentially
231 // and the std::map will then provide the link_stream_managers
232 // in the same order as the adapters were specified
233 adapters.push_back(_link_mgrs.at(src)->get_adapter_id());
234 }
235 return adapters;
236 } else {
237 throw uhd::rfnoc_error("Specified address is unreachable. No via_devices.");
238 }
239 }
240
241 private:
_check_dst_and_find_src(sep_addr_t dst_addr,uhd::transport::adapter_id_t adapter,uhd::transport::link_type_t link_type) const242 device_id_t _check_dst_and_find_src(sep_addr_t dst_addr,
243 uhd::transport::adapter_id_t adapter,
244 uhd::transport::link_type_t link_type) const
245 {
246 if (_src_map.count(dst_addr) > 0) {
247 const auto& src_devs = _src_map.at(dst_addr);
248 if (adapter == uhd::transport::NULL_ADAPTER_ID) {
249 // TODO: Maybe we can come up with a better heuristic for when the user
250 // gives no preference
251 auto dev = src_devs[0];
252 auto dev_alloc = _alloc_map.at(_link_mgrs.at(dev)->get_adapter_id());
253 for (auto candidate : src_devs) {
254 auto candidate_alloc =
255 _alloc_map.at(_link_mgrs.at(candidate)->get_adapter_id());
256 switch (link_type) {
257 case uhd::transport::link_type_t::TX_DATA:
258 if (candidate_alloc.tx < dev_alloc.tx) {
259 dev = candidate;
260 dev_alloc = candidate_alloc;
261 }
262 break;
263 case uhd::transport::link_type_t::RX_DATA:
264 if (candidate_alloc.rx < dev_alloc.rx) {
265 dev = candidate;
266 dev_alloc = candidate_alloc;
267 }
268 break;
269 default:
270 // Just accept first device for CTRL and ASYNC_MSG
271 break;
272 }
273 }
274 return dev;
275 } else {
276 for (const auto& src : src_devs) {
277 if (_link_mgrs.at(src)->get_adapter_id() == adapter) {
278 return src;
279 }
280 }
281 throw uhd::rfnoc_error("Specified destination address is unreachable "
282 "from the via device");
283 }
284 } else {
285 throw uhd::rfnoc_error("Specified destination address is unreachable");
286 }
287 }
288
289 // The cached EPID allocator object
290 epid_allocator::sptr _epid_alloc;
291 // A map the contains all link manager indexed by the device ID
292 std::map<device_id_t, link_stream_manager::uptr> _link_mgrs;
293 // A set of the addresses of all devices reachable from this graph
294 std::set<sep_addr_t> _reachable_endpoints;
295 // A map of addresses that can be taken to reach a particular destination
296 std::map<sep_addr_t, std::vector<device_id_t>> _src_map;
297
298 // Data used for heuristic to determine which link to use
299 struct allocation_info
300 {
301 size_t rx;
302 size_t tx;
303 };
304
305 // A map of allocations for each host transport adapter
306 std::map<uhd::transport::adapter_id_t, allocation_info> _alloc_map;
307 };
308
make(const chdr::chdr_packet_factory & pkt_factory,const epid_allocator::sptr & epid_alloc,const std::vector<std::pair<device_id_t,mb_iface * >> & links)309 graph_stream_manager::uptr graph_stream_manager::make(
310 const chdr::chdr_packet_factory& pkt_factory,
311 const epid_allocator::sptr& epid_alloc,
312 const std::vector<std::pair<device_id_t, mb_iface*>>& links)
313 {
314 return std::make_unique<graph_stream_manager_impl>(pkt_factory, epid_alloc, links);
315 }
316