1 //
2 // Copyright 2019 Ettus Research, a National Instruments Brand
3 //
4 // SPDX-License-Identifier: GPL-3.0-or-later
5 //
6 
7 #include <uhd/utils/log.hpp>
8 #include <uhd/utils/thread.hpp>
9 #include <uhdlib/transport/dpdk/arp.hpp>
10 #include <uhdlib/transport/dpdk/udp.hpp>
11 #include <uhdlib/transport/dpdk_io_service_client.hpp>
12 #include <uhdlib/utils/narrow.hpp>
13 #include <cmath>
14 
15 /*
16  * Memory management
17  *
18  * Every object that allocates and frees DPDK memory has a reference to the
19  * dpdk_ctx.
20  *
21  * Ownership hierarchy:
22  *
23  * dpdk_io_service_mgr (1) =>
24  *     dpdk_ctx::sptr
25  *     dpdk_io_service::sptr
26  *
27  * xport (1) =>
28  *     dpdk_send_io::sptr
29  *     dpdk_recv_io::sptr
30  *
31  * usrp link_mgr (1) =>
32  *     udp_dpdk_link::sptr
33  *
34  * dpdk_send_io (2) =>
35  *     dpdk_ctx::sptr
36  *     dpdk_io_service::sptr
37  *
38  * dpdk_recv_io (2) =>
39  *     dpdk_ctx::sptr
40  *     dpdk_io_service::sptr
41  *
42  * dpdk_io_service (3) =>
43  *     dpdk_ctx::wptr (weak_ptr)
44  *     udp_dpdk_link::sptr
45  *
46  * udp_dpdk_link (4) =>
47  *     dpdk_ctx::sptr
48  */
49 
50 using namespace uhd::transport;
51 
dpdk_io_service(unsigned int lcore_id,std::vector<dpdk::dpdk_port * > ports,size_t servq_depth)52 dpdk_io_service::dpdk_io_service(
53     unsigned int lcore_id, std::vector<dpdk::dpdk_port*> ports, size_t servq_depth)
54     : _ctx(dpdk::dpdk_ctx::get())
55     , _lcore_id(lcore_id)
56     , _ports(ports)
57     , _servq(servq_depth, lcore_id)
58 {
59     UHD_LOG_TRACE("DPDK::IO_SERVICE", "Launching I/O service for lcore " << lcore_id);
60     for (auto port : _ports) {
61         UHD_LOG_TRACE("DPDK::IO_SERVICE",
62             "lcore_id " << lcore_id << ": Adding port index " << port->get_port_id());
63         _tx_queues[port->get_port_id()]      = std::list<dpdk_send_io*>();
64         _recv_xport_map[port->get_port_id()] = std::list<dpdk_recv_io*>();
65     }
66     int status = rte_eal_remote_launch(_io_worker, this, lcore_id);
67     if (status) {
68         throw uhd::runtime_error("DPDK: I/O service cannot launch on busy lcore");
69     }
70 }
71 
make(unsigned int lcore_id,std::vector<dpdk::dpdk_port * > ports,size_t servq_depth)72 dpdk_io_service::sptr dpdk_io_service::make(
73     unsigned int lcore_id, std::vector<dpdk::dpdk_port*> ports, size_t servq_depth)
74 {
75     return dpdk_io_service::sptr(new dpdk_io_service(lcore_id, ports, servq_depth));
76 }
77 
~dpdk_io_service()78 dpdk_io_service::~dpdk_io_service()
79 {
80     UHD_LOG_TRACE(
81         "DPDK::IO_SERVICE", "Shutting down I/O service for lcore " << _lcore_id);
82     dpdk::wait_req* req = dpdk::wait_req_alloc(dpdk::wait_type::WAIT_LCORE_TERM, NULL);
83     if (!req) {
84         UHD_LOG_ERROR("DPDK::IO_SERVICE",
85             "Could not allocate request for lcore termination for lcore " << _lcore_id);
86         return;
87     }
88     dpdk::wait_req_get(req);
89     _servq.submit(req, std::chrono::microseconds(-1));
90     dpdk::wait_req_put(req);
91 }
92 
attach_recv_link(recv_link_if::sptr link)93 void dpdk_io_service::attach_recv_link(recv_link_if::sptr link)
94 {
95     struct dpdk_flow_data data;
96     data.link    = dynamic_cast<udp_dpdk_link*>(link.get());
97     data.is_recv = true;
98     assert(data.link);
99     auto req = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_OPEN, (void*)&data);
100     if (!req) {
101         UHD_LOG_ERROR(
102             "DPDK::IO_SERVICE", "Could not allocate wait_req to attach recv_link");
103         throw uhd::runtime_error("DPDK: Could not allocate wait_req to attach recv_link");
104     }
105     _servq.submit(req, std::chrono::microseconds(-1));
106     dpdk::wait_req_put(req);
107     {
108         std::lock_guard<std::mutex> lock(_mutex);
109         _recv_links.push_back(link);
110     }
111 }
112 
attach_send_link(send_link_if::sptr link)113 void dpdk_io_service::attach_send_link(send_link_if::sptr link)
114 {
115     udp_dpdk_link* dpdk_link = dynamic_cast<udp_dpdk_link*>(link.get());
116     assert(dpdk_link);
117 
118     // First, fill in destination MAC address
119     struct dpdk::arp_request arp_data;
120     arp_data.tpa  = dpdk_link->get_remote_ipv4();
121     arp_data.port = dpdk_link->get_port()->get_port_id();
122     if (dpdk_link->get_port()->dst_is_broadcast(arp_data.tpa)) {
123         // If a broadcast IP, skip the ARP and fill with broadcast MAC addr
124         memset(arp_data.tha.addr_bytes, 0xFF, 6);
125     } else {
126         auto arp_req = wait_req_alloc(dpdk::wait_type::WAIT_ARP, (void*)&arp_data);
127         if (!arp_req) {
128             UHD_LOG_ERROR(
129                 "DPDK::IO_SERVICE", "Could not allocate wait_req for ARP request");
130             throw uhd::runtime_error("DPDK: Could not allocate wait_req for ARP request");
131         }
132         if (_servq.submit(arp_req, std::chrono::microseconds(3000000))) {
133             // Try one more time...
134             auto arp_req2 = wait_req_alloc(dpdk::wait_type::WAIT_ARP, (void*)&arp_data);
135             if (_servq.submit(arp_req2, std::chrono::microseconds(30000000))) {
136                 wait_req_put(arp_req);
137                 wait_req_put(arp_req2);
138                 throw uhd::io_error("DPDK: Could not reach host");
139             }
140             wait_req_put(arp_req2);
141         }
142         wait_req_put(arp_req);
143     }
144     dpdk_link->set_remote_mac(arp_data.tha);
145 
146     // Then, submit the link to the I/O service thread
147     struct dpdk_flow_data data;
148     data.link    = dpdk_link;
149     data.is_recv = false;
150     auto req     = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_OPEN, (void*)&data);
151     if (!req) {
152         UHD_LOG_ERROR(
153             "DPDK::IO_SERVICE", "Could not allocate wait_req to attach send_link");
154         throw uhd::runtime_error("DPDK: Could not allocate wait_req to attach send_link");
155     }
156     _servq.submit(req, std::chrono::microseconds(-1));
157     wait_req_put(req);
158     {
159         std::lock_guard<std::mutex> lock(_mutex);
160         _send_links.push_back(link);
161     }
162 }
163 
detach_recv_link(recv_link_if::sptr link)164 void dpdk_io_service::detach_recv_link(recv_link_if::sptr link)
165 {
166     auto link_ptr = link.get();
167     struct dpdk_flow_data data;
168     data.link    = dynamic_cast<udp_dpdk_link*>(link_ptr);
169     data.is_recv = true;
170     auto req     = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_CLOSE, (void*)&data);
171     if (!req) {
172         UHD_LOG_ERROR(
173             "DPDK::IO_SERVICE", "Could not allocate wait_req to detach recv_link");
174         throw uhd::runtime_error("DPDK: Could not allocate wait_req to detach recv_link");
175     }
176     _servq.submit(req, std::chrono::microseconds(-1));
177     wait_req_put(req);
178     {
179         std::lock_guard<std::mutex> lock(_mutex);
180         _recv_links.remove_if(
181             [link_ptr](recv_link_if::sptr& item) { return item.get() == link_ptr; });
182     }
183 }
184 
detach_send_link(send_link_if::sptr link)185 void dpdk_io_service::detach_send_link(send_link_if::sptr link)
186 {
187     auto link_ptr = link.get();
188     struct dpdk_flow_data data;
189     data.link    = dynamic_cast<udp_dpdk_link*>(link_ptr);
190     data.is_recv = false;
191     auto req     = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_CLOSE, (void*)&data);
192     if (!req) {
193         UHD_LOG_ERROR(
194             "DPDK::IO_SERVICE", "Could not allocate wait_req to detach send_link");
195         throw uhd::runtime_error("DPDK: Could not allocate wait_req to detach send_link");
196     }
197     _servq.submit(req, std::chrono::microseconds(-1));
198     wait_req_put(req);
199     {
200         std::lock_guard<std::mutex> lock(_mutex);
201         _send_links.remove_if(
202             [link_ptr](send_link_if::sptr& item) { return item.get() == link_ptr; });
203     }
204 }
205 
make_recv_client(recv_link_if::sptr data_link,size_t num_recv_frames,recv_callback_t cb,send_link_if::sptr,size_t num_send_frames,recv_io_if::fc_callback_t fc_cb)206 recv_io_if::sptr dpdk_io_service::make_recv_client(recv_link_if::sptr data_link,
207     size_t num_recv_frames,
208     recv_callback_t cb,
209     send_link_if::sptr /*fc_link*/,
210     size_t num_send_frames,
211     recv_io_if::fc_callback_t fc_cb)
212 {
213     auto link    = dynamic_cast<udp_dpdk_link*>(data_link.get());
214     auto recv_io = std::make_shared<dpdk_recv_io>(
215         shared_from_this(), link, num_recv_frames, cb, num_send_frames, fc_cb);
216 
217     // Register with I/O service
218     recv_io->_dpdk_io_if.io_client = static_cast<void*>(recv_io.get());
219     auto xport_req                 = dpdk::wait_req_alloc(
220         dpdk::wait_type::WAIT_XPORT_CONNECT, (void*)&recv_io->_dpdk_io_if);
221     _servq.submit(xport_req, std::chrono::microseconds(-1));
222     wait_req_put(xport_req);
223     return recv_io;
224 }
225 
make_send_client(send_link_if::sptr send_link,size_t num_send_frames,send_io_if::send_callback_t send_cb,recv_link_if::sptr,size_t num_recv_frames,recv_callback_t recv_cb,send_io_if::fc_callback_t fc_cb)226 send_io_if::sptr dpdk_io_service::make_send_client(send_link_if::sptr send_link,
227     size_t num_send_frames,
228     send_io_if::send_callback_t send_cb,
229     recv_link_if::sptr /*recv_link*/,
230     size_t num_recv_frames,
231     recv_callback_t recv_cb,
232     send_io_if::fc_callback_t fc_cb)
233 {
234     auto link    = dynamic_cast<udp_dpdk_link*>(send_link.get());
235     auto send_io = std::make_shared<dpdk_send_io>(shared_from_this(),
236         link,
237         num_send_frames,
238         send_cb,
239         num_recv_frames,
240         recv_cb,
241         fc_cb);
242 
243     // Register with I/O service
244     send_io->_dpdk_io_if.io_client = static_cast<void*>(send_io.get());
245     auto xport_req                 = dpdk::wait_req_alloc(
246         dpdk::wait_type::WAIT_XPORT_CONNECT, (void*)&send_io->_dpdk_io_if);
247     _servq.submit(xport_req, std::chrono::microseconds(-1));
248     wait_req_put(xport_req);
249     return send_io;
250 }
251 
252 
_io_worker(void * arg)253 int dpdk_io_service::_io_worker(void* arg)
254 {
255     if (!arg)
256         return -EINVAL;
257     dpdk_io_service* srv = (dpdk_io_service*)arg;
258 
259     /* Check that this is a valid lcore */
260     unsigned int lcore_id = rte_lcore_id();
261     if (lcore_id == LCORE_ID_ANY)
262         return -ENODEV;
263 
264     /* Check that this lcore has ports */
265     if (srv->_ports.size() == 0)
266         return -ENODEV;
267 
268     char name[16];
269     snprintf(name, sizeof(name), "dpdk-io_%hu", (uint16_t)lcore_id);
270     rte_thread_setname(pthread_self(), name);
271     UHD_LOG_TRACE("DPDK::IO_SERVICE",
272         "I/O service thread '" << name << "' started on lcore " << lcore_id);
273 
274     uhd::set_thread_priority_safe();
275 
276     snprintf(name, sizeof(name), "rx-tbl_%hu", (uint16_t)lcore_id);
277     struct rte_hash_parameters hash_params = {.name = name,
278         .entries                                    = MAX_FLOWS,
279         .reserved                                   = 0,
280         .key_len                                    = sizeof(struct dpdk::ipv4_5tuple),
281         .hash_func                                  = NULL,
282         .hash_func_init_val                         = 0,
283         .socket_id  = uhd::narrow_cast<int>(rte_socket_id()),
284         .extra_flag = 0};
285     srv->_rx_table                         = rte_hash_create(&hash_params);
286     if (srv->_rx_table == NULL) {
287         return rte_errno;
288     }
289 
290     int status = 0;
291     while (!status) {
292         /* For each port, attempt to receive packets and process */
293         for (auto port : srv->_ports) {
294             srv->_rx_burst(port, 0);
295         }
296         /* For each port's TX queues, do TX */
297         for (auto port : srv->_ports) {
298             srv->_tx_burst(port);
299         }
300         /* For each port's RX release queues, release buffers */
301         for (auto port : srv->_ports) {
302             srv->_rx_release(port);
303         }
304         /* Retry waking clients */
305         if (srv->_retry_head) {
306             dpdk_io_if* node = srv->_retry_head;
307             dpdk_io_if* end  = srv->_retry_head->prev;
308             while (true) {
309                 dpdk_io_if* next = node->next;
310                 srv->_wake_client(node);
311                 if (node == end) {
312                     break;
313                 } else {
314                     node = next;
315                     next = node->next;
316                 }
317             }
318         }
319         /* Check for open()/close()/term() requests and service 1 at a time
320          * Leave this last so we immediately terminate if requested
321          */
322         status = srv->_service_requests();
323     }
324 
325     return status;
326 }
327 
_service_requests()328 int dpdk_io_service::_service_requests()
329 {
330     for (int i = 0; i < MAX_PENDING_SERVICE_REQS; i++) {
331         /* Dequeue */
332         dpdk::wait_req* req = _servq.pop();
333         if (!req) {
334             break;
335         }
336         switch (req->reason) {
337             case dpdk::wait_type::WAIT_SIMPLE:
338                 while (_servq.complete(req) == -ENOBUFS)
339                     ;
340                 break;
341             case dpdk::wait_type::WAIT_RX:
342             case dpdk::wait_type::WAIT_TX_BUF:
343                 throw uhd::not_implemented_error(
344                     "DPDK: _service_requests(): DPDK is still a WIP");
345             case dpdk::wait_type::WAIT_FLOW_OPEN:
346                 _service_flow_open(req);
347                 break;
348             case dpdk::wait_type::WAIT_FLOW_CLOSE:
349                 _service_flow_close(req);
350                 break;
351             case dpdk::wait_type::WAIT_XPORT_CONNECT:
352                 _service_xport_connect(req);
353                 break;
354             case dpdk::wait_type::WAIT_XPORT_DISCONNECT:
355                 _service_xport_disconnect(req);
356                 break;
357             case dpdk::wait_type::WAIT_ARP: {
358                 assert(req->data != NULL);
359                 int arp_status = _service_arp_request(req);
360                 assert(arp_status != -ENOMEM);
361                 if (arp_status == 0) {
362                     while (_servq.complete(req) == -ENOBUFS)
363                         ;
364                 }
365                 break;
366             }
367             case dpdk::wait_type::WAIT_LCORE_TERM:
368                 rte_free(_rx_table);
369                 while (_servq.complete(req) == -ENOBUFS)
370                     ;
371                 // Return a positive value to indicate we should terminate
372                 return 1;
373             default:
374                 UHD_LOG_ERROR(
375                     "DPDK::IO_SERVICE", "Invalid reason associated with wait request");
376                 while (_servq.complete(req) == -ENOBUFS)
377                     ;
378                 break;
379         }
380     }
381     return 0;
382 }
383 
_service_flow_open(dpdk::wait_req * req)384 void dpdk_io_service::_service_flow_open(dpdk::wait_req* req)
385 {
386     auto flow_req_data = (struct dpdk_flow_data*)req->data;
387     assert(flow_req_data);
388     if (flow_req_data->is_recv) {
389         // If RX, add to RX table. Currently, nothing to do for TX.
390         struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP,
391             .src_ip                                   = 0,
392             .dst_ip   = flow_req_data->link->get_port()->get_ipv4(),
393             .src_port = 0,
394             .dst_port = flow_req_data->link->get_local_port()};
395         // Check the UDP port isn't in use
396         if (rte_hash_lookup(_rx_table, &ht_key) > 0) {
397             req->retval = -EADDRINUSE;
398             UHD_LOG_ERROR("DPDK::IO_SERVICE", "Cannot add to RX table");
399             while (_servq.complete(req) == -ENOBUFS)
400                 ;
401             return;
402         }
403         // Add xport list for this UDP port
404         auto rx_entry = new std::list<dpdk_io_if*>();
405         if (rte_hash_add_key_data(_rx_table, &ht_key, rx_entry)) {
406             UHD_LOG_ERROR("DPDK::IO_SERVICE", "Could not add new RX list to table");
407             delete rx_entry;
408             req->retval = -ENOMEM;
409             while (_servq.complete(req) == -ENOBUFS)
410                 ;
411             return;
412         }
413     }
414     while (_servq.complete(req) == -ENOBUFS)
415         ;
416 }
417 
_service_flow_close(dpdk::wait_req * req)418 void dpdk_io_service::_service_flow_close(dpdk::wait_req* req)
419 {
420     auto flow_req_data = (struct dpdk_flow_data*)req->data;
421     assert(flow_req_data);
422     if (flow_req_data->is_recv) {
423         // If RX, remove from RX table. Currently, nothing to do for TX.
424         struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP,
425             .src_ip                                   = 0,
426             .dst_ip   = flow_req_data->link->get_port()->get_ipv4(),
427             .src_port = 0,
428             .dst_port = flow_req_data->link->get_local_port()};
429         std::list<dpdk_io_if*>* xport_list;
430 
431         if (rte_hash_lookup_data(_rx_table, &ht_key, (void**)&xport_list) > 0) {
432             UHD_ASSERT_THROW(xport_list->empty());
433             delete xport_list;
434             rte_hash_del_key(_rx_table, &ht_key);
435             while (_servq.complete(req) == -ENOBUFS)
436                 ;
437             return;
438         }
439     }
440     while (_servq.complete(req) == -ENOBUFS)
441         ;
442 }
443 
_service_xport_connect(dpdk::wait_req * req)444 void dpdk_io_service::_service_xport_connect(dpdk::wait_req* req)
445 {
446     auto dpdk_io = static_cast<dpdk_io_if*>(req->data);
447     UHD_ASSERT_THROW(dpdk_io);
448     auto port = dpdk_io->link->get_port();
449     if (dpdk_io->recv_cb) {
450         // Add to RX table only if have a callback.
451         struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP,
452             .src_ip                                   = 0,
453             .dst_ip                                   = port->get_ipv4(),
454             .src_port                                 = 0,
455             .dst_port                                 = dpdk_io->link->get_local_port()};
456         void* hash_data;
457         if (rte_hash_lookup_data(_rx_table, &ht_key, &hash_data) < 0) {
458             req->retval = -ENOENT;
459             UHD_LOG_ERROR("DPDK::IO_SERVICE", "Cannot add xport to RX table");
460             while (_servq.complete(req) == -ENOBUFS)
461                 ;
462             return;
463         }
464         // Add to xport list for this UDP port
465         auto rx_entry = (std::list<dpdk_io_if*>*)(hash_data);
466         rx_entry->push_back(dpdk_io);
467     }
468     if (dpdk_io->is_recv) {
469         UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing RX connect request...");
470         // Add to xport list for this NIC port
471         auto& xport_list = _recv_xport_map.at(port->get_port_id());
472         xport_list.push_back((dpdk_recv_io*)dpdk_io->io_client);
473     } else {
474         UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing TX connect request...");
475         dpdk_send_io* send_io = static_cast<dpdk_send_io*>(dpdk_io->io_client);
476         // Add to xport list for this NIC port
477         auto& xport_list = _tx_queues.at(port->get_port_id());
478         xport_list.push_back(send_io);
479         for (size_t i = 0; i < send_io->_num_send_frames; i++) {
480             auto buff_ptr =
481                 (dpdk::dpdk_frame_buff*)dpdk_io->link->get_send_buff(0).release();
482             if (!buff_ptr) {
483                 UHD_LOG_ERROR("DPDK::IO_SERVICE",
484                     "TX mempool out of memory. Please increase dpdk_num_mbufs.");
485                 break;
486             }
487             if (rte_ring_enqueue(send_io->_buffer_queue, buff_ptr)) {
488                 rte_pktmbuf_free(buff_ptr->get_pktmbuf());
489                 break;
490             }
491             send_io->_num_frames_in_use++;
492         }
493     }
494     while (_servq.complete(req) == -ENOBUFS)
495         ;
496 }
497 
_service_xport_disconnect(dpdk::wait_req * req)498 void dpdk_io_service::_service_xport_disconnect(dpdk::wait_req* req)
499 {
500     auto dpdk_io = (struct dpdk_io_if*)req->data;
501     assert(dpdk_io);
502     auto port = dpdk_io->link->get_port();
503     if (dpdk_io->recv_cb) {
504         // Remove from RX table only if have a callback.
505         struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP,
506             .src_ip                                   = 0,
507             .dst_ip                                   = port->get_ipv4(),
508             .src_port                                 = 0,
509             .dst_port                                 = dpdk_io->link->get_local_port()};
510         void* hash_data;
511         if (rte_hash_lookup_data(_rx_table, &ht_key, &hash_data) >= 0) {
512             // Remove from xport list for this UDP port
513             auto rx_entry = (std::list<dpdk_io_if*>*)(hash_data);
514             rx_entry->remove(dpdk_io);
515         } else {
516             req->retval = -EINVAL;
517             UHD_LOG_ERROR("DPDK::IO_SERVICE", "Cannot remove xport from RX table");
518         }
519     }
520     if (dpdk_io->is_recv) {
521         UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing RX disconnect request...");
522         dpdk_recv_io* recv_client = static_cast<dpdk_recv_io*>(dpdk_io->io_client);
523         // Remove from xport list for this NIC port
524         auto& xport_list = _recv_xport_map.at(port->get_port_id());
525         xport_list.remove(recv_client);
526         while (!rte_ring_empty(recv_client->_recv_queue)) {
527             frame_buff* buff_ptr;
528             rte_ring_dequeue(recv_client->_recv_queue, (void**)&buff_ptr);
529             dpdk_io->link->release_recv_buff(frame_buff::uptr(buff_ptr));
530         }
531         while (!rte_ring_empty(recv_client->_release_queue)) {
532             frame_buff* buff_ptr;
533             rte_ring_dequeue(recv_client->_release_queue, (void**)&buff_ptr);
534             dpdk_io->link->release_recv_buff(frame_buff::uptr(buff_ptr));
535         }
536     } else {
537         UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing TX disconnect request...");
538         dpdk_send_io* send_client = static_cast<dpdk_send_io*>(dpdk_io->io_client);
539         // Remove from xport list for this NIC port
540         auto& xport_list = _tx_queues.at(port->get_port_id());
541         xport_list.remove(send_client);
542         while (!rte_ring_empty(send_client->_send_queue)) {
543             frame_buff* buff_ptr;
544             rte_ring_dequeue(send_client->_send_queue, (void**)&buff_ptr);
545             dpdk_io->link->release_send_buff(frame_buff::uptr(buff_ptr));
546         }
547         while (!rte_ring_empty(send_client->_buffer_queue)) {
548             frame_buff* buff_ptr;
549             rte_ring_dequeue(send_client->_buffer_queue, (void**)&buff_ptr);
550             dpdk_io->link->release_send_buff(frame_buff::uptr(buff_ptr));
551         }
552     }
553     // Now remove the node if it's on the retry list
554     if ((_retry_head == dpdk_io) && (dpdk_io->next == dpdk_io)) {
555         _retry_head = NULL;
556     } else if (_retry_head) {
557         dpdk_io_if* node = _retry_head->next;
558         while (node != _retry_head) {
559             if (node == dpdk_io) {
560                 dpdk_io->prev->next = dpdk_io->next;
561                 dpdk_io->next->prev = dpdk_io->prev;
562                 break;
563             }
564             node = node->next;
565         }
566     }
567     while (_servq.complete(req) == -ENOBUFS)
568         ;
569 }
570 
_service_arp_request(dpdk::wait_req * req)571 int dpdk_io_service::_service_arp_request(dpdk::wait_req* req)
572 {
573     int status               = 0;
574     auto arp_req_data        = (struct dpdk::arp_request*)req->data;
575     dpdk::ipv4_addr dst_addr = arp_req_data->tpa;
576     auto ctx_sptr            = _ctx.lock();
577     UHD_ASSERT_THROW(ctx_sptr);
578     dpdk::dpdk_port* port = ctx_sptr->get_port(arp_req_data->port);
579     UHD_LOG_TRACE("DPDK::IO_SERVICE",
580         "ARP: Requesting address for " << dpdk::ipv4_num_to_str(dst_addr));
581 
582     rte_spinlock_lock(&port->_spinlock);
583     struct dpdk::arp_entry* entry = NULL;
584     if (port->_arp_table.count(dst_addr) == 0) {
585         entry = (struct dpdk::arp_entry*)rte_zmalloc(NULL, sizeof(*entry), 0);
586         if (!entry) {
587             status = -ENOMEM;
588             goto arp_end;
589         }
590         entry = new (entry) dpdk::arp_entry();
591         entry->reqs.push_back(req);
592         port->_arp_table[dst_addr] = entry;
593         status                     = -EAGAIN;
594         UHD_LOG_TRACE("DPDK::IO_SERVICE", "Address not in table. Sending ARP request.");
595         _send_arp_request(port, 0, arp_req_data->tpa);
596     } else {
597         entry = port->_arp_table.at(dst_addr);
598         if (is_zero_ether_addr(&entry->mac_addr)) {
599             UHD_LOG_TRACE("DPDK::IO_SERVICE",
600                 "ARP: Address in table, but not populated yet. Resending ARP request.");
601             port->_arp_table.at(dst_addr)->reqs.push_back(req);
602             status = -EAGAIN;
603             _send_arp_request(port, 0, arp_req_data->tpa);
604         } else {
605             UHD_LOG_TRACE("DPDK::IO_SERVICE", "ARP: Address in table.");
606             ether_addr_copy(&entry->mac_addr, &arp_req_data->tha);
607             status = 0;
608         }
609     }
610 arp_end:
611     rte_spinlock_unlock(&port->_spinlock);
612     return status;
613 }
614 
_send_arp_request(dpdk::dpdk_port * port,dpdk::queue_id_t queue,dpdk::ipv4_addr ip)615 int dpdk_io_service::_send_arp_request(
616     dpdk::dpdk_port* port, dpdk::queue_id_t queue, dpdk::ipv4_addr ip)
617 {
618     struct rte_mbuf* mbuf;
619     struct ether_hdr* hdr;
620     struct arp_hdr* arp_frame;
621 
622     mbuf = rte_pktmbuf_alloc(port->get_tx_pktbuf_pool());
623     if (unlikely(mbuf == NULL)) {
624         UHD_LOG_WARNING(
625             "DPDK::IO_SERVICE", "Could not allocate packet buffer for ARP request");
626         return -ENOMEM;
627     }
628 
629     hdr       = rte_pktmbuf_mtod(mbuf, struct ether_hdr*);
630     arp_frame = (struct arp_hdr*)&hdr[1];
631 
632     memset(hdr->d_addr.addr_bytes, 0xFF, ETHER_ADDR_LEN);
633     hdr->s_addr     = port->get_mac_addr();
634     hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_ARP);
635 
636     arp_frame->arp_hrd          = rte_cpu_to_be_16(ARP_HRD_ETHER);
637     arp_frame->arp_pro          = rte_cpu_to_be_16(ETHER_TYPE_IPv4);
638     arp_frame->arp_hln          = 6;
639     arp_frame->arp_pln          = 4;
640     arp_frame->arp_op           = rte_cpu_to_be_16(ARP_OP_REQUEST);
641     arp_frame->arp_data.arp_sha = port->get_mac_addr();
642     arp_frame->arp_data.arp_sip = port->get_ipv4();
643     memset(arp_frame->arp_data.arp_tha.addr_bytes, 0x00, ETHER_ADDR_LEN);
644     arp_frame->arp_data.arp_tip = ip;
645 
646     mbuf->pkt_len  = 42;
647     mbuf->data_len = 42;
648 
649     if (rte_eth_tx_burst(port->get_port_id(), queue, &mbuf, 1) != 1) {
650         UHD_LOG_WARNING("DPDK::IO_SERVICE", "ARP request not sent: Descriptor ring full");
651         rte_pktmbuf_free(mbuf);
652         return -EAGAIN;
653     }
654     return 0;
655 }
656 
657 /* Do a burst of RX on port */
_rx_burst(dpdk::dpdk_port * port,dpdk::queue_id_t queue)658 int dpdk_io_service::_rx_burst(dpdk::dpdk_port* port, dpdk::queue_id_t queue)
659 {
660     struct ether_hdr* hdr;
661     char* l2_data;
662     struct rte_mbuf* bufs[RX_BURST_SIZE];
663     const uint16_t num_rx =
664         rte_eth_rx_burst(port->get_port_id(), queue, bufs, RX_BURST_SIZE);
665     if (unlikely(num_rx == 0)) {
666         return 0;
667     }
668 
669     for (int buf = 0; buf < num_rx; buf++) {
670         uint64_t ol_flags = bufs[buf]->ol_flags;
671         hdr               = rte_pktmbuf_mtod(bufs[buf], struct ether_hdr*);
672         l2_data           = (char*)&hdr[1];
673         switch (rte_be_to_cpu_16(hdr->ether_type)) {
674             case ETHER_TYPE_ARP:
675                 _process_arp(port, queue, (struct arp_hdr*)l2_data);
676                 rte_pktmbuf_free(bufs[buf]);
677                 break;
678             case ETHER_TYPE_IPv4:
679                 if ((ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_BAD) {
680                     UHD_LOG_WARNING("DPDK::IO_SERVICE", "RX packet has bad IP cksum");
681                 } else if ((ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_NONE) {
682                     UHD_LOG_WARNING("DPDK::IO_SERVICE", "RX packet missing IP cksum");
683                 } else {
684                     _process_ipv4(port, bufs[buf], (struct ipv4_hdr*)l2_data);
685                 }
686                 break;
687             default:
688                 rte_pktmbuf_free(bufs[buf]);
689                 break;
690         }
691     }
692     return num_rx;
693 }
694 
_process_arp(dpdk::dpdk_port * port,dpdk::queue_id_t queue_id,struct arp_hdr * arp_frame)695 int dpdk_io_service::_process_arp(
696     dpdk::dpdk_port* port, dpdk::queue_id_t queue_id, struct arp_hdr* arp_frame)
697 {
698     uint32_t dest_ip            = arp_frame->arp_data.arp_sip;
699     struct ether_addr dest_addr = arp_frame->arp_data.arp_sha;
700     UHD_LOG_TRACE("DPDK::IO_SERVICE",
701         "Processing ARP packet: " << dpdk::ipv4_num_to_str(dest_ip) << " -> "
702                                   << dpdk::eth_addr_to_string(dest_addr));
703     /* Add entry to ARP table */
704     rte_spinlock_lock(&port->_spinlock);
705     struct dpdk::arp_entry* entry = NULL;
706     if (port->_arp_table.count(dest_ip) == 0) {
707         entry = (struct dpdk::arp_entry*)rte_zmalloc(NULL, sizeof(*entry), 0);
708         if (!entry) {
709             return -ENOMEM;
710         }
711         entry = new (entry) dpdk::arp_entry();
712         ether_addr_copy(&dest_addr, &entry->mac_addr);
713         port->_arp_table[dest_ip] = entry;
714     } else {
715         entry = port->_arp_table.at(dest_ip);
716         ether_addr_copy(&dest_addr, &entry->mac_addr);
717         for (auto req : entry->reqs) {
718             auto arp_data = (struct dpdk::arp_request*)req->data;
719             ether_addr_copy(&dest_addr, &arp_data->tha);
720             while (_servq.complete(req) == -ENOBUFS)
721                 ;
722         }
723         entry->reqs.clear();
724     }
725     rte_spinlock_unlock(&port->_spinlock);
726 
727     /* Respond if this was an ARP request */
728     if (arp_frame->arp_op == rte_cpu_to_be_16(ARP_OP_REQUEST)
729         && arp_frame->arp_data.arp_tip == port->get_ipv4()) {
730         UHD_LOG_TRACE("DPDK::IO_SERVICE", "Sending ARP reply.");
731         port->_arp_reply(queue_id, arp_frame);
732     }
733 
734     return 0;
735 }
736 
_process_ipv4(dpdk::dpdk_port * port,struct rte_mbuf * mbuf,struct ipv4_hdr * pkt)737 int dpdk_io_service::_process_ipv4(
738     dpdk::dpdk_port* port, struct rte_mbuf* mbuf, struct ipv4_hdr* pkt)
739 {
740     bool bcast = port->dst_is_broadcast(pkt->dst_addr);
741     if (pkt->dst_addr != port->get_ipv4() && !bcast) {
742         rte_pktmbuf_free(mbuf);
743         return -ENODEV;
744     }
745     if (pkt->next_proto_id == IPPROTO_UDP) {
746         return _process_udp(port, mbuf, (struct udp_hdr*)&pkt[1], bcast);
747     }
748     rte_pktmbuf_free(mbuf);
749     return -EINVAL;
750 }
751 
752 
_process_udp(dpdk::dpdk_port * port,struct rte_mbuf * mbuf,struct udp_hdr * pkt,bool)753 int dpdk_io_service::_process_udp(
754     dpdk::dpdk_port* port, struct rte_mbuf* mbuf, struct udp_hdr* pkt, bool /*bcast*/)
755 {
756     // Get the link
757     struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP,
758         .src_ip                                   = 0,
759         .dst_ip                                   = port->get_ipv4(),
760         .src_port                                 = 0,
761         .dst_port                                 = pkt->dst_port};
762     void* hash_data;
763     if (rte_hash_lookup_data(_rx_table, &ht_key, &hash_data) < 0) {
764         UHD_LOG_WARNING("DPDK::IO_SERVICE", "Dropping packet: No link entry in rx table");
765         rte_pktmbuf_free(mbuf);
766         return -ENOENT;
767     }
768     // Get xport list for this UDP port
769     auto rx_entry = (std::list<dpdk_io_if*>*)(hash_data);
770     if (rx_entry->empty()) {
771         UHD_LOG_WARNING("DPDK::IO_SERVICE", "Dropping packet: No xports for link");
772         rte_pktmbuf_free(mbuf);
773         return -ENOENT;
774     }
775     // Turn rte_mbuf -> dpdk_frame_buff
776     auto link = rx_entry->front()->link;
777     link->enqueue_recv_mbuf(mbuf);
778     auto buff       = link->get_recv_buff(0);
779     bool rcvr_found = false;
780     for (auto client_if : *rx_entry) {
781         // Check all the muxed receivers...
782         if (client_if->recv_cb(buff, link, link)) {
783             rcvr_found = true;
784             if (buff) {
785                 assert(client_if->is_recv);
786                 auto recv_io  = (dpdk_recv_io*)client_if->io_client;
787                 auto buff_ptr = (dpdk::dpdk_frame_buff*)buff.release();
788                 if (rte_ring_enqueue(recv_io->_recv_queue, buff_ptr)) {
789                     rte_pktmbuf_free(buff_ptr->get_pktmbuf());
790                     UHD_LOG_WARNING(
791                         "DPDK::IO_SERVICE", "Dropping packet: No space in recv queue");
792                 } else {
793                     recv_io->_num_frames_in_use++;
794                     assert(recv_io->_num_frames_in_use <= recv_io->_num_recv_frames);
795                     _wake_client(client_if);
796                 }
797             }
798             break;
799         }
800     }
801     if (!rcvr_found) {
802         UHD_LOG_WARNING("DPDK::IO_SERVICE", "Dropping packet: No receiver xport found");
803         // Release the buffer if no receiver found
804         link->release_recv_buff(std::move(buff));
805         return -ENOENT;
806     }
807     return 0;
808 }
809 
810 /* Do a burst of TX on port's tx queues */
_tx_burst(dpdk::dpdk_port * port)811 int dpdk_io_service::_tx_burst(dpdk::dpdk_port* port)
812 {
813     unsigned int total_tx = 0;
814     auto& queues          = _tx_queues.at(port->get_port_id());
815 
816     for (auto& send_io : queues) {
817         unsigned int num_tx   = rte_ring_count(send_io->_send_queue);
818         num_tx                = (num_tx < TX_BURST_SIZE) ? num_tx : TX_BURST_SIZE;
819         bool replaced_buffers = false;
820         for (unsigned int i = 0; i < num_tx; i++) {
821             size_t frame_size = send_io->_dpdk_io_if.link->get_send_frame_size();
822             if (send_io->_fc_cb && !send_io->_fc_cb(frame_size)) {
823                 break;
824             }
825             dpdk::dpdk_frame_buff* buff_ptr;
826             int status = rte_ring_dequeue(send_io->_send_queue, (void**)&buff_ptr);
827             if (status) {
828                 UHD_LOG_ERROR("DPDK::IO_SERVICE", "TX Q Count doesn't match actual");
829                 break;
830             }
831             send_io->_send_cb(frame_buff::uptr(buff_ptr), send_io->_dpdk_io_if.link);
832             // Attempt to replace buffer
833             buff_ptr = (dpdk::dpdk_frame_buff*)send_io->_dpdk_io_if.link->get_send_buff(0)
834                            .release();
835             if (!buff_ptr) {
836                 UHD_LOG_ERROR("DPDK::IO_SERVICE",
837                     "TX mempool out of memory. Please increase dpdk_num_mbufs.");
838                 send_io->_num_frames_in_use--;
839             } else if (rte_ring_enqueue(send_io->_buffer_queue, buff_ptr)) {
840                 rte_pktmbuf_free(buff_ptr->get_pktmbuf());
841                 send_io->_num_frames_in_use--;
842             } else {
843                 replaced_buffers = true;
844             }
845         }
846         if (replaced_buffers) {
847             _wake_client(&send_io->_dpdk_io_if);
848         }
849         total_tx += num_tx;
850     }
851 
852     return total_tx;
853 }
854 
_rx_release(dpdk::dpdk_port * port)855 int dpdk_io_service::_rx_release(dpdk::dpdk_port* port)
856 {
857     unsigned int total_bufs = 0;
858     auto& queues            = _recv_xport_map.at(port->get_port_id());
859 
860     for (auto& recv_io : queues) {
861         unsigned int num_buf = rte_ring_count(recv_io->_release_queue);
862         num_buf              = (num_buf < RX_BURST_SIZE) ? num_buf : RX_BURST_SIZE;
863         for (unsigned int i = 0; i < num_buf; i++) {
864             dpdk::dpdk_frame_buff* buff_ptr;
865             int status = rte_ring_dequeue(recv_io->_release_queue, (void**)&buff_ptr);
866             if (status) {
867                 UHD_LOG_ERROR("DPDK::IO_SERVICE", "RX Q Count doesn't match actual");
868                 break;
869             }
870             recv_io->_fc_cb(frame_buff::uptr(buff_ptr),
871                 recv_io->_dpdk_io_if.link,
872                 recv_io->_dpdk_io_if.link);
873             recv_io->_num_frames_in_use--;
874         }
875         total_bufs += num_buf;
876     }
877 
878     return total_bufs;
879 }
880 
_get_unique_client_id()881 uint16_t dpdk_io_service::_get_unique_client_id()
882 {
883     std::lock_guard<std::mutex> lock(_mutex);
884     if (_client_id_set.size() >= MAX_CLIENTS) {
885         UHD_LOG_ERROR("DPDK::IO_SERVICE", "Exceeded maximum number of clients");
886         throw uhd::runtime_error("DPDK::IO_SERVICE: Exceeded maximum number of clients");
887     }
888 
889     uint16_t id = _next_client_id++;
890     while (_client_id_set.count(id)) {
891         id = _next_client_id++;
892     }
893     _client_id_set.insert(id);
894     return id;
895 }
896 
_wake_client(dpdk_io_if * dpdk_io)897 void dpdk_io_service::_wake_client(dpdk_io_if* dpdk_io)
898 {
899     dpdk::wait_req* req;
900     if (dpdk_io->is_recv) {
901         auto recv_io = static_cast<dpdk_recv_io*>(dpdk_io->io_client);
902         req          = recv_io->_waiter;
903     } else {
904         auto send_io = static_cast<dpdk_send_io*>(dpdk_io->io_client);
905         req          = send_io->_waiter;
906     }
907     bool stat = req->mutex.try_lock();
908     if (stat) {
909         bool active_req = !req->complete;
910         if (dpdk_io->next) {
911             // On the list: Take it off
912             if (dpdk_io->next == dpdk_io) {
913                 // Only node on the list
914                 _retry_head = NULL;
915             } else {
916                 // Other nodes are on the list
917                 if (_retry_head == dpdk_io) {
918                     // Move list head to next
919                     _retry_head = dpdk_io->next;
920                 }
921                 dpdk_io->next->prev = dpdk_io->prev;
922                 dpdk_io->prev->next = dpdk_io->next;
923             }
924             dpdk_io->next = NULL;
925             dpdk_io->prev = NULL;
926         }
927         if (active_req) {
928             req->complete = true;
929             req->cond.notify_one();
930         }
931         req->mutex.unlock();
932         if (active_req) {
933             wait_req_put(req);
934         }
935     } else {
936         // Put on the retry list, if it isn't already
937         if (!dpdk_io->next) {
938             if (_retry_head) {
939                 dpdk_io->next           = _retry_head;
940                 dpdk_io->prev           = _retry_head->prev;
941                 _retry_head->prev->next = dpdk_io;
942                 _retry_head->prev       = dpdk_io;
943             } else {
944                 _retry_head   = dpdk_io;
945                 dpdk_io->next = dpdk_io;
946                 dpdk_io->prev = dpdk_io;
947             }
948         }
949     }
950 }
951