1 //
2 // Copyright 2019 Ettus Research, a National Instruments brand
3 //
4 // SPDX-License-Identifier: GPL-3.0-or-later
5 //
6 
7 #include <uhd/utils/algorithm.hpp>
8 #include <uhd/utils/log.hpp>
9 #include <uhdlib/transport/dpdk/arp.hpp>
10 #include <uhdlib/transport/dpdk/common.hpp>
11 #include <uhdlib/transport/dpdk/udp.hpp>
12 #include <uhdlib/transport/dpdk_io_service.hpp>
13 #include <uhdlib/utils/prefs.hpp>
14 #include <arpa/inet.h>
15 #include <rte_arp.h>
16 #include <boost/algorithm/string.hpp>
17 
18 namespace uhd { namespace transport { namespace dpdk {
19 
20 namespace {
21 constexpr uint64_t USEC                      = 1000000;
22 constexpr size_t DEFAULT_FRAME_SIZE          = 8000;
23 constexpr int DEFAULT_NUM_MBUFS              = 1024;
24 constexpr int DEFAULT_MBUF_CACHE_SIZE        = 315;
25 constexpr size_t DPDK_HEADERS_SIZE           = 14 + 20 + 8; // Ethernet + IPv4 + UDP
26 constexpr uint16_t DPDK_DEFAULT_RING_SIZE    = 512;
27 constexpr int DEFAULT_DPDK_LINK_INIT_TIMEOUT = 1000;
28 constexpr int LINK_STATUS_INTERVAL           = 250;
29 
eal_add_opt(std::vector<const char * > & argv,size_t n,char * dst,const char * opt,const char * arg)30 inline char* eal_add_opt(
31     std::vector<const char*>& argv, size_t n, char* dst, const char* opt, const char* arg)
32 {
33     UHD_LOG_TRACE("DPDK", opt << " " << arg);
34     char* ptr = dst;
35     strncpy(ptr, opt, n);
36     argv.push_back(ptr);
37     ptr += strlen(opt) + 1;
38     n -= ptr - dst;
39     strncpy(ptr, arg, n);
40     argv.push_back(ptr);
41     ptr += strlen(arg) + 1;
42     return ptr;
43 }
44 
separate_ipv4_addr(const std::string ipv4,uint32_t & ipv4_addr,uint32_t & netmask)45 inline void separate_ipv4_addr(
46     const std::string ipv4, uint32_t& ipv4_addr, uint32_t& netmask)
47 {
48     std::vector<std::string> result;
49     boost::algorithm::split(
50         result, ipv4, [](const char& in) { return in == '/'; }, boost::token_compress_on);
51     UHD_ASSERT_THROW(result.size() == 2);
52     ipv4_addr   = (uint32_t)inet_addr(result[0].c_str());
53     int netbits = std::atoi(result[1].c_str());
54     netmask     = htonl(0xffffffff << (32 - netbits));
55 }
56 } // namespace
57 
make(port_id_t port,size_t mtu,uint16_t num_queues,uint16_t num_desc,struct rte_mempool * rx_pktbuf_pool,struct rte_mempool * tx_pktbuf_pool,std::string ipv4_address)58 dpdk_port::uptr dpdk_port::make(port_id_t port,
59     size_t mtu,
60     uint16_t num_queues,
61     uint16_t num_desc,
62     struct rte_mempool* rx_pktbuf_pool,
63     struct rte_mempool* tx_pktbuf_pool,
64     std::string ipv4_address)
65 {
66     return std::make_unique<dpdk_port>(
67         port, mtu, num_queues, num_desc, rx_pktbuf_pool, tx_pktbuf_pool, ipv4_address);
68 }
69 
dpdk_port(port_id_t port,size_t mtu,uint16_t num_queues,uint16_t num_desc,struct rte_mempool * rx_pktbuf_pool,struct rte_mempool * tx_pktbuf_pool,std::string ipv4_address)70 dpdk_port::dpdk_port(port_id_t port,
71     size_t mtu,
72     uint16_t num_queues,
73     uint16_t num_desc,
74     struct rte_mempool* rx_pktbuf_pool,
75     struct rte_mempool* tx_pktbuf_pool,
76     std::string ipv4_address)
77     : _port(port)
78     , _mtu(mtu)
79     , _num_queues(num_queues)
80     , _rx_pktbuf_pool(rx_pktbuf_pool)
81     , _tx_pktbuf_pool(tx_pktbuf_pool)
82 {
83     /* Set MTU and IPv4 address */
84     int retval;
85 
86     retval = rte_eth_dev_set_mtu(_port, _mtu);
87     if (retval) {
88         uint16_t actual_mtu;
89         UHD_LOGGER_WARNING("DPDK")
90             << boost::format("Port %d: Could not set mtu to %d") % _port % _mtu;
91         rte_eth_dev_get_mtu(_port, &actual_mtu);
92         UHD_LOGGER_WARNING("DPDK")
93             << boost::format("Port %d: Current mtu=%d") % _port % _mtu;
94         _mtu = actual_mtu;
95     }
96 
97     separate_ipv4_addr(ipv4_address, _ipv4, _netmask);
98 
99     /* Set hardware offloads */
100     struct rte_eth_dev_info dev_info;
101     rte_eth_dev_info_get(_port, &dev_info);
102     uint64_t rx_offloads = DEV_RX_OFFLOAD_IPV4_CKSUM;
103     uint64_t tx_offloads = DEV_TX_OFFLOAD_IPV4_CKSUM;
104     if ((dev_info.rx_offload_capa & rx_offloads) != rx_offloads) {
105         UHD_LOGGER_ERROR("DPDK") << boost::format("%d: Only supports RX offloads 0x%0llx")
106                                         % _port % dev_info.rx_offload_capa;
107         throw uhd::runtime_error("DPDK: Missing required RX offloads");
108     }
109     if ((dev_info.tx_offload_capa & tx_offloads) != tx_offloads) {
110         UHD_LOGGER_ERROR("DPDK") << boost::format("%d: Only supports TX offloads 0x%0llx")
111                                         % _port % dev_info.tx_offload_capa;
112         throw uhd::runtime_error("DPDK: Missing required TX offloads");
113     }
114 
115     // Check number of available queues
116     if (dev_info.max_rx_queues < num_queues || dev_info.max_tx_queues < num_queues) {
117         _num_queues = std::min(dev_info.max_rx_queues, dev_info.max_tx_queues);
118         UHD_LOGGER_WARNING("DPDK")
119             << boost::format("%d: Maximum queues supported is %d") % _port % _num_queues;
120     } else {
121         _num_queues = num_queues;
122     }
123 
124     struct rte_eth_conf port_conf   = {};
125     port_conf.rxmode.offloads       = rx_offloads | DEV_RX_OFFLOAD_JUMBO_FRAME;
126     port_conf.rxmode.max_rx_pkt_len = _mtu;
127     port_conf.txmode.offloads       = tx_offloads;
128     port_conf.intr_conf.lsc         = 1;
129 
130     retval = rte_eth_dev_configure(_port, _num_queues, _num_queues, &port_conf);
131     if (retval != 0) {
132         UHD_LOG_ERROR("DPDK", "Failed to configure the device");
133         throw uhd::runtime_error("DPDK: Failed to configure the device");
134     }
135 
136     /* Set descriptor ring sizes */
137     uint16_t rx_desc = num_desc;
138     if (dev_info.rx_desc_lim.nb_max < rx_desc || dev_info.rx_desc_lim.nb_min > rx_desc
139         || (dev_info.rx_desc_lim.nb_align - 1) & rx_desc) {
140         UHD_LOGGER_ERROR("DPDK")
141             << boost::format("%d: %d RX descriptors requested, but must be in [%d,%d]")
142                    % _port % num_desc % dev_info.rx_desc_lim.nb_min
143                    % dev_info.rx_desc_lim.nb_max;
144         UHD_LOGGER_ERROR("DPDK")
145             << boost::format("Num RX descriptors must also be aligned to 0x%x")
146                    % dev_info.rx_desc_lim.nb_align;
147         throw uhd::runtime_error("DPDK: Failed to allocate RX descriptors");
148     }
149 
150     uint16_t tx_desc = num_desc;
151     if (dev_info.tx_desc_lim.nb_max < tx_desc || dev_info.tx_desc_lim.nb_min > tx_desc
152         || (dev_info.tx_desc_lim.nb_align - 1) & tx_desc) {
153         UHD_LOGGER_ERROR("DPDK")
154             << boost::format("%d: %d TX descriptors requested, but must be in [%d,%d]")
155                    % _port % num_desc % dev_info.tx_desc_lim.nb_min
156                    % dev_info.tx_desc_lim.nb_max;
157         UHD_LOGGER_ERROR("DPDK")
158             << boost::format("Num TX descriptors must also be aligned to 0x%x")
159                    % dev_info.tx_desc_lim.nb_align;
160         throw uhd::runtime_error("DPDK: Failed to allocate TX descriptors");
161     }
162 
163     retval = rte_eth_dev_adjust_nb_rx_tx_desc(_port, &rx_desc, &tx_desc);
164     if (retval != 0) {
165         UHD_LOG_ERROR("DPDK", "Failed to configure the DMA queues ");
166         throw uhd::runtime_error("DPDK: Failed to configure the DMA queues");
167     }
168 
169     /* Set up the RX and TX DMA queues (May not be generally supported after
170      * eth_dev_start) */
171     unsigned int cpu_socket = rte_eth_dev_socket_id(_port);
172     for (uint16_t i = 0; i < _num_queues; i++) {
173         retval =
174             rte_eth_rx_queue_setup(_port, i, rx_desc, cpu_socket, NULL, _rx_pktbuf_pool);
175         if (retval < 0) {
176             UHD_LOGGER_ERROR("DPDK")
177                 << boost::format("Port %d: Could not init RX queue %d") % _port % i;
178             throw uhd::runtime_error("DPDK: Failure to init RX queue");
179         }
180 
181         struct rte_eth_txconf txconf = dev_info.default_txconf;
182         txconf.offloads              = DEV_TX_OFFLOAD_IPV4_CKSUM;
183         retval = rte_eth_tx_queue_setup(_port, i, tx_desc, cpu_socket, &txconf);
184         if (retval < 0) {
185             UHD_LOGGER_ERROR("DPDK")
186                 << boost::format("Port %d: Could not init TX queue %d") % _port % i;
187             throw uhd::runtime_error("DPDK: Failure to init TX queue");
188         }
189     }
190 
191     /* TODO: Enable multiple queues (only support 1 right now) */
192 
193     /* Start the Ethernet device */
194     retval = rte_eth_dev_start(_port);
195     if (retval < 0) {
196         UHD_LOGGER_ERROR("DPDK")
197             << boost::format("Port %d: Could not start device") % _port;
198         throw uhd::runtime_error("DPDK: Failure to start device");
199     }
200 
201     /* Grab and display the port MAC address. */
202     rte_eth_macaddr_get(_port, &_mac_addr);
203     UHD_LOGGER_TRACE("DPDK") << "Port " << _port
204                              << " MAC: " << eth_addr_to_string(_mac_addr);
205 }
206 
~dpdk_port()207 dpdk_port::~dpdk_port()
208 {
209     rte_eth_dev_stop(_port);
210     rte_spinlock_lock(&_spinlock);
211     for (auto kv : _arp_table) {
212         for (auto req : kv.second->reqs) {
213             req->cond.notify_one();
214         }
215         rte_free(kv.second);
216     }
217     _arp_table.clear();
218     rte_spinlock_unlock(&_spinlock);
219 }
220 
alloc_udp_port(uint16_t udp_port)221 uint16_t dpdk_port::alloc_udp_port(uint16_t udp_port)
222 {
223     uint16_t port_selected;
224     std::lock_guard<std::mutex> lock(_mutex);
225     if (udp_port) {
226         if (_udp_ports.count(rte_be_to_cpu_16(udp_port))) {
227             return 0;
228         }
229         port_selected = rte_be_to_cpu_16(udp_port);
230     } else {
231         if (_udp_ports.size() >= 65535) {
232             UHD_LOG_WARNING("DPDK", "Attempted to allocate UDP port, but none remain");
233             return 0;
234         }
235         port_selected = _next_udp_port;
236         while (true) {
237             if (port_selected == 0) {
238                 continue;
239             }
240             if (_udp_ports.count(port_selected) == 0) {
241                 _next_udp_port = port_selected - 1;
242                 break;
243             }
244             if (port_selected - 1 == _next_udp_port) {
245                 return 0;
246             }
247             port_selected--;
248         }
249     }
250     _udp_ports.insert(port_selected);
251     return rte_cpu_to_be_16(port_selected);
252 }
253 
_arp_reply(queue_id_t queue_id,struct arp_hdr * arp_req)254 int dpdk_port::_arp_reply(queue_id_t queue_id, struct arp_hdr* arp_req)
255 {
256     struct rte_mbuf* mbuf;
257     struct ether_hdr* hdr;
258     struct arp_hdr* arp_frame;
259 
260     mbuf = rte_pktmbuf_alloc(_tx_pktbuf_pool);
261     if (unlikely(mbuf == NULL)) {
262         UHD_LOG_WARNING("DPDK", "Could not allocate packet buffer for ARP response");
263         return -ENOMEM;
264     }
265 
266     hdr       = rte_pktmbuf_mtod(mbuf, struct ether_hdr*);
267     arp_frame = (struct arp_hdr*)&hdr[1];
268 
269     ether_addr_copy(&arp_req->arp_data.arp_sha, &hdr->d_addr);
270     ether_addr_copy(&_mac_addr, &hdr->s_addr);
271     hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_ARP);
272 
273     arp_frame->arp_hrd = rte_cpu_to_be_16(ARP_HRD_ETHER);
274     arp_frame->arp_pro = rte_cpu_to_be_16(ETHER_TYPE_IPv4);
275     arp_frame->arp_hln = 6;
276     arp_frame->arp_pln = 4;
277     arp_frame->arp_op  = rte_cpu_to_be_16(ARP_OP_REPLY);
278     ether_addr_copy(&_mac_addr, &arp_frame->arp_data.arp_sha);
279     arp_frame->arp_data.arp_sip = _ipv4;
280     ether_addr_copy(&hdr->d_addr, &arp_frame->arp_data.arp_tha);
281     arp_frame->arp_data.arp_tip = arp_req->arp_data.arp_sip;
282 
283     mbuf->pkt_len  = 42;
284     mbuf->data_len = 42;
285 
286     if (rte_eth_tx_burst(_port, queue_id, &mbuf, 1) != 1) {
287         UHD_LOGGER_WARNING("DPDK")
288             << boost::format("%s: TX descriptor ring is full") % __func__;
289         rte_pktmbuf_free(mbuf);
290         return -EAGAIN;
291     }
292     return 0;
293 }
294 
295 static dpdk_ctx::sptr global_ctx = nullptr;
296 static std::mutex global_ctx_mutex;
297 
get()298 dpdk_ctx::sptr dpdk_ctx::get()
299 {
300     std::lock_guard<std::mutex> lock(global_ctx_mutex);
301     if (!global_ctx) {
302         global_ctx = std::make_shared<dpdk_ctx>();
303     }
304     return global_ctx;
305 }
306 
dpdk_ctx(void)307 dpdk_ctx::dpdk_ctx(void) : _init_done(false) {}
308 
~dpdk_ctx(void)309 dpdk_ctx::~dpdk_ctx(void)
310 {
311     std::lock_guard<std::mutex> lock(global_ctx_mutex);
312     global_ctx = nullptr;
313     // Destroy the io service
314     _io_srv_portid_map.clear();
315     // Destroy and stop all the ports
316     _ports.clear();
317     // Free mempools
318     for (auto& pool : _rx_pktbuf_pools) {
319         rte_mempool_free(pool);
320     }
321     for (auto& pool : _tx_pktbuf_pools) {
322         rte_mempool_free(pool);
323     }
324     // Free EAL resources
325     rte_eal_cleanup();
326 }
327 
_eal_init(const device_addr_t & eal_args)328 void dpdk_ctx::_eal_init(const device_addr_t& eal_args)
329 {
330     /* Build up argc and argv */
331     std::vector<const char*> argv;
332     argv.push_back("uhd::transport::dpdk");
333     auto args = new std::array<char, 4096>();
334     char* opt = args->data();
335     char* end = args->data() + args->size();
336     UHD_LOG_TRACE("DPDK", "EAL init options: ");
337     for (std::string& key : eal_args.keys()) {
338         std::string val = eal_args[key];
339         if (key == "dpdk_coremask") {
340             opt = eal_add_opt(argv, end - opt, opt, "-c", val.c_str());
341         } else if (key == "dpdk_corelist") {
342             /* NOTE: This arg may have commas, so limited to config file */
343             opt = eal_add_opt(argv, end - opt, opt, "-l", val.c_str());
344         } else if (key == "dpdk_coremap") {
345             opt = eal_add_opt(argv, end - opt, opt, "--lcores", val.c_str());
346         } else if (key == "dpdk_master_lcore") {
347             opt = eal_add_opt(argv, end - opt, opt, "--master-lcore", val.c_str());
348         } else if (key == "dpdk_pci_blacklist") {
349             opt = eal_add_opt(argv, end - opt, opt, "-b", val.c_str());
350         } else if (key == "dpdk_pci_whitelist") {
351             opt = eal_add_opt(argv, end - opt, opt, "-w", val.c_str());
352         } else if (key == "dpdk_log_level") {
353             opt = eal_add_opt(argv, end - opt, opt, "--log-level", val.c_str());
354         } else if (key == "dpdk_huge_dir") {
355             opt = eal_add_opt(argv, end - opt, opt, "--huge-dir", val.c_str());
356         } else if (key == "dpdk_file_prefix") {
357             opt = eal_add_opt(argv, end - opt, opt, "--file-prefix", val.c_str());
358         } else if (key == "dpdk_driver") {
359             opt = eal_add_opt(argv, end - opt, opt, "-d", val.c_str());
360         }
361         /* TODO: Change where log goes?
362            int rte_openlog_stream( FILE * f)
363          */
364     }
365     /* Init DPDK's EAL */
366     int ret = rte_eal_init(argv.size(), (char**)argv.data());
367     /* Done with the temporaries */
368     delete args;
369 
370     if (ret < 0) {
371         UHD_LOG_ERROR("DPDK", "Error with EAL initialization");
372         throw uhd::runtime_error("Error with EAL initialization");
373     }
374 
375     /* Create pktbuf pool entries, but only allocate on use  */
376     int socket_count = rte_socket_count();
377     for (int i = 0; i < socket_count; i++) {
378         _rx_pktbuf_pools.push_back(NULL);
379         _tx_pktbuf_pools.push_back(NULL);
380     }
381 }
382 
383 /**
384  * Init DPDK environment, including DPDK's EAL.
385  * This will make available information about the DPDK-assigned NIC devices.
386  *
387  * \param user_args User args passed in to override config files
388  */
init(const device_addr_t & user_args)389 void dpdk_ctx::init(const device_addr_t& user_args)
390 {
391     unsigned int i;
392     std::lock_guard<std::mutex> lock(_init_mutex);
393     if (!_init_done) {
394         /* Gather global config, build args for EAL, and init UHD-DPDK */
395         const device_addr_t dpdk_args = uhd::prefs::get_dpdk_args(user_args);
396         UHD_LOG_TRACE("DPDK", "Configuration:" << std::endl << dpdk_args.to_pp_string());
397         _eal_init(dpdk_args);
398 
399         /* TODO: Should MTU be defined per-port? */
400         _mtu = dpdk_args.cast<size_t>("dpdk_mtu", DEFAULT_FRAME_SIZE);
401         /* This is per queue */
402         _num_mbufs = dpdk_args.cast<int>("dpdk_num_mbufs", DEFAULT_NUM_MBUFS);
403         _mbuf_cache_size =
404             dpdk_args.cast<int>("dpdk_mbuf_cache_size", DEFAULT_MBUF_CACHE_SIZE);
405         UHD_LOG_TRACE("DPDK",
406             "mtu: " << _mtu << " num_mbufs: " << _num_mbufs
407                     << " mbuf_cache_size: " << _mbuf_cache_size);
408 
409         _link_init_timeout =
410             dpdk_args.cast<int>("dpdk_link_timeout", DEFAULT_DPDK_LINK_INIT_TIMEOUT);
411 
412         /* Get device info for all the NIC ports */
413         int num_dpdk_ports = rte_eth_dev_count_avail();
414         if (num_dpdk_ports == 0) {
415             UHD_LOG_ERROR("DPDK", "No available DPDK devices (ports) found!");
416             throw uhd::runtime_error("No available DPDK devices (ports) found!");
417         }
418         device_addrs_t nics(num_dpdk_ports);
419         RTE_ETH_FOREACH_DEV(i)
420         {
421             struct ether_addr mac_addr;
422             rte_eth_macaddr_get(i, &mac_addr);
423             nics[i]["dpdk_mac"] = eth_addr_to_string(mac_addr);
424         }
425 
426         /* Get user configuration for each NIC port */
427         device_addrs_t args = separate_device_addr(user_args);
428         size_t queue_count  = 0;
429         RTE_ETH_FOREACH_DEV(i)
430         {
431             auto& nic = nics.at(i);
432             for (const auto& arg : args) {
433                 /* Match DPDK-discovered NICs and user config via MAC addr */
434                 if (arg.has_key("dpdk_mac") && nic["dpdk_mac"] == arg["dpdk_mac"]) {
435                     /* Copy user args for discovered NICs */
436                     nic.update(arg, false);
437                     break;
438                 }
439             }
440             /* Now combine user args with conf file */
441             auto conf = uhd::prefs::get_dpdk_nic_args(nic);
442             // TODO: Enable the use of multiple DMA queues
443             conf["dpdk_num_queues"] = "1";
444 
445             /* Update config, and remove ports that aren't fully configured */
446             if (conf.has_key("dpdk_ipv4")) {
447                 nics[i] = conf;
448                 /* Update queue count, to generate a large enough mempool */
449                 queue_count += conf.cast<uint16_t>("dpdk_num_queues", rte_lcore_count());
450             } else {
451                 nics[i] = device_addr_t();
452             }
453         }
454 
455         std::map<size_t, std::vector<size_t>> lcore_to_port_id_map;
456         RTE_ETH_FOREACH_DEV(i)
457         {
458             auto& conf = nics.at(i);
459             if (conf.has_key("dpdk_ipv4")) {
460                 UHD_ASSERT_THROW(conf.has_key("dpdk_lcore"));
461                 const size_t lcore_id = conf.cast<size_t>("dpdk_lcore", 0);
462                 if (!lcore_to_port_id_map.count(lcore_id)) {
463                     lcore_to_port_id_map.insert({lcore_id, {}});
464                 }
465 
466                 // Allocating enough buffers for all DMA queues for each CPU socket
467                 // - This is a bit inefficient for larger systems, since NICs may not
468                 //   all be on one socket
469                 auto cpu_socket = rte_eth_dev_socket_id(i);
470                 auto rx_pool = _get_rx_pktbuf_pool(cpu_socket, _num_mbufs * queue_count);
471                 auto tx_pool = _get_tx_pktbuf_pool(cpu_socket, _num_mbufs * queue_count);
472                 UHD_LOG_TRACE("DPDK",
473                     "Initializing NIC(" << i << "):" << std::endl
474                                         << conf.to_pp_string());
475                 _ports[i] = dpdk_port::make(i,
476                     _mtu,
477                     conf.cast<uint16_t>("dpdk_num_queues", rte_lcore_count()),
478                     conf.cast<uint16_t>("dpdk_num_desc", DPDK_DEFAULT_RING_SIZE),
479                     rx_pool,
480                     tx_pool,
481                     conf["dpdk_ipv4"]);
482 
483                 // Remember all port IDs that map to an lcore
484                 lcore_to_port_id_map.at(lcore_id).push_back(i);
485             }
486         }
487 
488         UHD_LOG_TRACE("DPDK", "Waiting for links to come up...");
489         do {
490             bool all_ports_up = true;
491             for (auto& port : _ports) {
492                 struct rte_eth_link link;
493                 auto portid = port.second->get_port_id();
494                 rte_eth_link_get(portid, &link);
495                 unsigned int link_status = link.link_status;
496                 unsigned int link_speed  = link.link_speed;
497                 UHD_LOGGER_TRACE("DPDK") << boost::format("Port %u UP: %d, %u Mbps")
498                                                 % portid % link_status % link_speed;
499                 all_ports_up &= (link.link_status == 1);
500             }
501 
502             if (all_ports_up) {
503                 break;
504             }
505 
506             rte_delay_ms(LINK_STATUS_INTERVAL);
507             _link_init_timeout -= LINK_STATUS_INTERVAL;
508             if (_link_init_timeout <= 0 && not all_ports_up) {
509                 UHD_LOG_ERROR("DPDK", "All DPDK links did not report as up!")
510                 throw uhd::runtime_error("DPDK: All DPDK links did not report as up!");
511             }
512         } while (true);
513 
514         UHD_LOG_TRACE("DPDK", "Init done -- spawning IO services");
515         _init_done = true;
516 
517         // Links are up, now create one IO service per lcore
518         for (auto& lcore_portids_pair : lcore_to_port_id_map) {
519             const size_t lcore_id = lcore_portids_pair.first;
520             std::vector<dpdk_port*> dpdk_ports;
521             dpdk_ports.reserve(lcore_portids_pair.second.size());
522             for (const size_t port_id : lcore_portids_pair.second) {
523                 dpdk_ports.push_back(get_port(port_id));
524             }
525             const size_t servq_depth = 32; // FIXME
526             UHD_LOG_TRACE("DPDK",
527                 "Creating I/O service for lcore "
528                     << lcore_id << ", servicing " << dpdk_ports.size()
529                     << " ports, service queue depth " << servq_depth);
530             _io_srv_portid_map.insert(
531                 {uhd::transport::dpdk_io_service::make(lcore_id, dpdk_ports, servq_depth),
532                     lcore_portids_pair.second});
533         }
534     }
535 }
536 
get_port(port_id_t port) const537 dpdk_port* dpdk_ctx::get_port(port_id_t port) const
538 {
539     assert(is_init_done());
540     if (_ports.count(port) == 0) {
541         return nullptr;
542     }
543     return _ports.at(port).get();
544 }
545 
get_port(struct ether_addr mac_addr) const546 dpdk_port* dpdk_ctx::get_port(struct ether_addr mac_addr) const
547 {
548     assert(is_init_done());
549     for (const auto& port : _ports) {
550         struct ether_addr port_mac_addr;
551         rte_eth_macaddr_get(port.first, &port_mac_addr);
552         for (int j = 0; j < 6; j++) {
553             if (mac_addr.addr_bytes[j] != port_mac_addr.addr_bytes[j]) {
554                 break;
555             }
556             if (j == 5) {
557                 return port.second.get();
558             }
559         }
560     }
561     return nullptr;
562 }
563 
get_port_count(void)564 int dpdk_ctx::get_port_count(void)
565 {
566     assert(is_init_done());
567     return _ports.size();
568 }
569 
get_port_queue_count(port_id_t portid)570 int dpdk_ctx::get_port_queue_count(port_id_t portid)
571 {
572     assert(is_init_done());
573     return _ports.at(portid)->get_queue_count();
574 }
575 
get_port_link_status(port_id_t portid) const576 int dpdk_ctx::get_port_link_status(port_id_t portid) const
577 {
578     struct rte_eth_link link;
579     rte_eth_link_get_nowait(portid, &link);
580     return link.link_status;
581 }
582 
get_route(const std::string & addr) const583 dpdk_port* dpdk_ctx::get_route(const std::string& addr) const
584 {
585     const uint32_t dst_ipv4 = (uint32_t)inet_addr(addr.c_str());
586     for (const auto& port : _ports) {
587         if (get_port_link_status(port.first) < 1)
588             continue;
589         uint32_t src_ipv4 = port.second->get_ipv4();
590         uint32_t netmask  = port.second->get_netmask();
591         if ((src_ipv4 & netmask) == (dst_ipv4 & netmask)) {
592             return port.second.get();
593         }
594     }
595     return NULL;
596 }
597 
598 
is_init_done(void) const599 bool dpdk_ctx::is_init_done(void) const
600 {
601     return _init_done.load();
602 }
603 
get_io_service(const size_t port_id)604 uhd::transport::dpdk_io_service::sptr dpdk_ctx::get_io_service(const size_t port_id)
605 {
606     for (auto& io_srv_portid_pair : _io_srv_portid_map) {
607         if (uhd::has(io_srv_portid_pair.second, port_id)) {
608             return io_srv_portid_pair.first;
609         }
610     }
611 
612     std::string err_msg = std::string("Cannot look up I/O service for port ID: ")
613                           + std::to_string(port_id) + ". No such port ID!";
614     UHD_LOG_ERROR("DPDK", err_msg);
615     throw uhd::lookup_error(err_msg);
616 }
617 
_get_rx_pktbuf_pool(unsigned int cpu_socket,size_t num_bufs)618 struct rte_mempool* dpdk_ctx::_get_rx_pktbuf_pool(
619     unsigned int cpu_socket, size_t num_bufs)
620 {
621     if (!_rx_pktbuf_pools.at(cpu_socket)) {
622         const int mbuf_size = _mtu + RTE_PKTMBUF_HEADROOM;
623         char name[32];
624         snprintf(name, sizeof(name), "rx_mbuf_pool_%u", cpu_socket);
625         _rx_pktbuf_pools[cpu_socket] = rte_pktmbuf_pool_create(name,
626             num_bufs,
627             _mbuf_cache_size,
628             DPDK_MBUF_PRIV_SIZE,
629             mbuf_size,
630             SOCKET_ID_ANY);
631         if (!_rx_pktbuf_pools.at(cpu_socket)) {
632             UHD_LOG_ERROR("DPDK", "Could not allocate RX pktbuf pool");
633             throw uhd::runtime_error("DPDK: Could not allocate RX pktbuf pool");
634         }
635     }
636     return _rx_pktbuf_pools.at(cpu_socket);
637 }
638 
_get_tx_pktbuf_pool(unsigned int cpu_socket,size_t num_bufs)639 struct rte_mempool* dpdk_ctx::_get_tx_pktbuf_pool(
640     unsigned int cpu_socket, size_t num_bufs)
641 {
642     if (!_tx_pktbuf_pools.at(cpu_socket)) {
643         const int mbuf_size = _mtu + RTE_PKTMBUF_HEADROOM;
644         char name[32];
645         snprintf(name, sizeof(name), "tx_mbuf_pool_%u", cpu_socket);
646         _tx_pktbuf_pools[cpu_socket] = rte_pktmbuf_pool_create(
647             name, num_bufs, _mbuf_cache_size, 0, mbuf_size, SOCKET_ID_ANY);
648         if (!_tx_pktbuf_pools.at(cpu_socket)) {
649             UHD_LOG_ERROR("DPDK", "Could not allocate TX pktbuf pool");
650             throw uhd::runtime_error("DPDK: Could not allocate TX pktbuf pool");
651         }
652     }
653     return _tx_pktbuf_pools.at(cpu_socket);
654 }
655 
656 }}} // namespace uhd::transport::dpdk
657