1 //
2 // Copyright 2019 Ettus Research, a National Instruments Brand
3 //
4 // SPDX-License-Identifier: GPL-3.0-or-later
5 //
6
7 #include <uhd/utils/log.hpp>
8 #include <uhd/utils/thread.hpp>
9 #include <uhdlib/transport/dpdk/arp.hpp>
10 #include <uhdlib/transport/dpdk/udp.hpp>
11 #include <uhdlib/transport/dpdk_io_service_client.hpp>
12 #include <uhdlib/utils/narrow.hpp>
13 #include <cmath>
14
15 /*
16 * Memory management
17 *
18 * Every object that allocates and frees DPDK memory has a reference to the
19 * dpdk_ctx.
20 *
21 * Ownership hierarchy:
22 *
23 * dpdk_io_service_mgr (1) =>
24 * dpdk_ctx::sptr
25 * dpdk_io_service::sptr
26 *
27 * xport (1) =>
28 * dpdk_send_io::sptr
29 * dpdk_recv_io::sptr
30 *
31 * usrp link_mgr (1) =>
32 * udp_dpdk_link::sptr
33 *
34 * dpdk_send_io (2) =>
35 * dpdk_ctx::sptr
36 * dpdk_io_service::sptr
37 *
38 * dpdk_recv_io (2) =>
39 * dpdk_ctx::sptr
40 * dpdk_io_service::sptr
41 *
42 * dpdk_io_service (3) =>
43 * dpdk_ctx::wptr (weak_ptr)
44 * udp_dpdk_link::sptr
45 *
46 * udp_dpdk_link (4) =>
47 * dpdk_ctx::sptr
48 */
49
50 using namespace uhd::transport;
51
dpdk_io_service(unsigned int lcore_id,std::vector<dpdk::dpdk_port * > ports,size_t servq_depth)52 dpdk_io_service::dpdk_io_service(
53 unsigned int lcore_id, std::vector<dpdk::dpdk_port*> ports, size_t servq_depth)
54 : _ctx(dpdk::dpdk_ctx::get())
55 , _lcore_id(lcore_id)
56 , _ports(ports)
57 , _servq(servq_depth, lcore_id)
58 {
59 UHD_LOG_TRACE("DPDK::IO_SERVICE", "Launching I/O service for lcore " << lcore_id);
60 for (auto port : _ports) {
61 UHD_LOG_TRACE("DPDK::IO_SERVICE",
62 "lcore_id " << lcore_id << ": Adding port index " << port->get_port_id());
63 _tx_queues[port->get_port_id()] = std::list<dpdk_send_io*>();
64 _recv_xport_map[port->get_port_id()] = std::list<dpdk_recv_io*>();
65 }
66 int status = rte_eal_remote_launch(_io_worker, this, lcore_id);
67 if (status) {
68 throw uhd::runtime_error("DPDK: I/O service cannot launch on busy lcore");
69 }
70 }
71
make(unsigned int lcore_id,std::vector<dpdk::dpdk_port * > ports,size_t servq_depth)72 dpdk_io_service::sptr dpdk_io_service::make(
73 unsigned int lcore_id, std::vector<dpdk::dpdk_port*> ports, size_t servq_depth)
74 {
75 return dpdk_io_service::sptr(new dpdk_io_service(lcore_id, ports, servq_depth));
76 }
77
~dpdk_io_service()78 dpdk_io_service::~dpdk_io_service()
79 {
80 UHD_LOG_TRACE(
81 "DPDK::IO_SERVICE", "Shutting down I/O service for lcore " << _lcore_id);
82 dpdk::wait_req* req = dpdk::wait_req_alloc(dpdk::wait_type::WAIT_LCORE_TERM, NULL);
83 if (!req) {
84 UHD_LOG_ERROR("DPDK::IO_SERVICE",
85 "Could not allocate request for lcore termination for lcore " << _lcore_id);
86 return;
87 }
88 dpdk::wait_req_get(req);
89 _servq.submit(req, std::chrono::microseconds(-1));
90 dpdk::wait_req_put(req);
91 }
92
attach_recv_link(recv_link_if::sptr link)93 void dpdk_io_service::attach_recv_link(recv_link_if::sptr link)
94 {
95 struct dpdk_flow_data data;
96 data.link = dynamic_cast<udp_dpdk_link*>(link.get());
97 data.is_recv = true;
98 assert(data.link);
99 auto req = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_OPEN, (void*)&data);
100 if (!req) {
101 UHD_LOG_ERROR(
102 "DPDK::IO_SERVICE", "Could not allocate wait_req to attach recv_link");
103 throw uhd::runtime_error("DPDK: Could not allocate wait_req to attach recv_link");
104 }
105 _servq.submit(req, std::chrono::microseconds(-1));
106 dpdk::wait_req_put(req);
107 {
108 std::lock_guard<std::mutex> lock(_mutex);
109 _recv_links.push_back(link);
110 }
111 }
112
attach_send_link(send_link_if::sptr link)113 void dpdk_io_service::attach_send_link(send_link_if::sptr link)
114 {
115 udp_dpdk_link* dpdk_link = dynamic_cast<udp_dpdk_link*>(link.get());
116 assert(dpdk_link);
117
118 // First, fill in destination MAC address
119 struct dpdk::arp_request arp_data;
120 arp_data.tpa = dpdk_link->get_remote_ipv4();
121 arp_data.port = dpdk_link->get_port()->get_port_id();
122 if (dpdk_link->get_port()->dst_is_broadcast(arp_data.tpa)) {
123 // If a broadcast IP, skip the ARP and fill with broadcast MAC addr
124 memset(arp_data.tha.addr_bytes, 0xFF, 6);
125 } else {
126 auto arp_req = wait_req_alloc(dpdk::wait_type::WAIT_ARP, (void*)&arp_data);
127 if (!arp_req) {
128 UHD_LOG_ERROR(
129 "DPDK::IO_SERVICE", "Could not allocate wait_req for ARP request");
130 throw uhd::runtime_error("DPDK: Could not allocate wait_req for ARP request");
131 }
132 if (_servq.submit(arp_req, std::chrono::microseconds(3000000))) {
133 // Try one more time...
134 auto arp_req2 = wait_req_alloc(dpdk::wait_type::WAIT_ARP, (void*)&arp_data);
135 if (_servq.submit(arp_req2, std::chrono::microseconds(30000000))) {
136 wait_req_put(arp_req);
137 wait_req_put(arp_req2);
138 throw uhd::io_error("DPDK: Could not reach host");
139 }
140 wait_req_put(arp_req2);
141 }
142 wait_req_put(arp_req);
143 }
144 dpdk_link->set_remote_mac(arp_data.tha);
145
146 // Then, submit the link to the I/O service thread
147 struct dpdk_flow_data data;
148 data.link = dpdk_link;
149 data.is_recv = false;
150 auto req = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_OPEN, (void*)&data);
151 if (!req) {
152 UHD_LOG_ERROR(
153 "DPDK::IO_SERVICE", "Could not allocate wait_req to attach send_link");
154 throw uhd::runtime_error("DPDK: Could not allocate wait_req to attach send_link");
155 }
156 _servq.submit(req, std::chrono::microseconds(-1));
157 wait_req_put(req);
158 {
159 std::lock_guard<std::mutex> lock(_mutex);
160 _send_links.push_back(link);
161 }
162 }
163
detach_recv_link(recv_link_if::sptr link)164 void dpdk_io_service::detach_recv_link(recv_link_if::sptr link)
165 {
166 auto link_ptr = link.get();
167 struct dpdk_flow_data data;
168 data.link = dynamic_cast<udp_dpdk_link*>(link_ptr);
169 data.is_recv = true;
170 auto req = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_CLOSE, (void*)&data);
171 if (!req) {
172 UHD_LOG_ERROR(
173 "DPDK::IO_SERVICE", "Could not allocate wait_req to detach recv_link");
174 throw uhd::runtime_error("DPDK: Could not allocate wait_req to detach recv_link");
175 }
176 _servq.submit(req, std::chrono::microseconds(-1));
177 wait_req_put(req);
178 {
179 std::lock_guard<std::mutex> lock(_mutex);
180 _recv_links.remove_if(
181 [link_ptr](recv_link_if::sptr& item) { return item.get() == link_ptr; });
182 }
183 }
184
detach_send_link(send_link_if::sptr link)185 void dpdk_io_service::detach_send_link(send_link_if::sptr link)
186 {
187 auto link_ptr = link.get();
188 struct dpdk_flow_data data;
189 data.link = dynamic_cast<udp_dpdk_link*>(link_ptr);
190 data.is_recv = false;
191 auto req = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_CLOSE, (void*)&data);
192 if (!req) {
193 UHD_LOG_ERROR(
194 "DPDK::IO_SERVICE", "Could not allocate wait_req to detach send_link");
195 throw uhd::runtime_error("DPDK: Could not allocate wait_req to detach send_link");
196 }
197 _servq.submit(req, std::chrono::microseconds(-1));
198 wait_req_put(req);
199 {
200 std::lock_guard<std::mutex> lock(_mutex);
201 _send_links.remove_if(
202 [link_ptr](send_link_if::sptr& item) { return item.get() == link_ptr; });
203 }
204 }
205
make_recv_client(recv_link_if::sptr data_link,size_t num_recv_frames,recv_callback_t cb,send_link_if::sptr,size_t num_send_frames,recv_io_if::fc_callback_t fc_cb)206 recv_io_if::sptr dpdk_io_service::make_recv_client(recv_link_if::sptr data_link,
207 size_t num_recv_frames,
208 recv_callback_t cb,
209 send_link_if::sptr /*fc_link*/,
210 size_t num_send_frames,
211 recv_io_if::fc_callback_t fc_cb)
212 {
213 auto link = dynamic_cast<udp_dpdk_link*>(data_link.get());
214 auto recv_io = std::make_shared<dpdk_recv_io>(
215 shared_from_this(), link, num_recv_frames, cb, num_send_frames, fc_cb);
216
217 // Register with I/O service
218 recv_io->_dpdk_io_if.io_client = static_cast<void*>(recv_io.get());
219 auto xport_req = dpdk::wait_req_alloc(
220 dpdk::wait_type::WAIT_XPORT_CONNECT, (void*)&recv_io->_dpdk_io_if);
221 _servq.submit(xport_req, std::chrono::microseconds(-1));
222 wait_req_put(xport_req);
223 return recv_io;
224 }
225
make_send_client(send_link_if::sptr send_link,size_t num_send_frames,send_io_if::send_callback_t send_cb,recv_link_if::sptr,size_t num_recv_frames,recv_callback_t recv_cb,send_io_if::fc_callback_t fc_cb)226 send_io_if::sptr dpdk_io_service::make_send_client(send_link_if::sptr send_link,
227 size_t num_send_frames,
228 send_io_if::send_callback_t send_cb,
229 recv_link_if::sptr /*recv_link*/,
230 size_t num_recv_frames,
231 recv_callback_t recv_cb,
232 send_io_if::fc_callback_t fc_cb)
233 {
234 auto link = dynamic_cast<udp_dpdk_link*>(send_link.get());
235 auto send_io = std::make_shared<dpdk_send_io>(shared_from_this(),
236 link,
237 num_send_frames,
238 send_cb,
239 num_recv_frames,
240 recv_cb,
241 fc_cb);
242
243 // Register with I/O service
244 send_io->_dpdk_io_if.io_client = static_cast<void*>(send_io.get());
245 auto xport_req = dpdk::wait_req_alloc(
246 dpdk::wait_type::WAIT_XPORT_CONNECT, (void*)&send_io->_dpdk_io_if);
247 _servq.submit(xport_req, std::chrono::microseconds(-1));
248 wait_req_put(xport_req);
249 return send_io;
250 }
251
252
_io_worker(void * arg)253 int dpdk_io_service::_io_worker(void* arg)
254 {
255 if (!arg)
256 return -EINVAL;
257 dpdk_io_service* srv = (dpdk_io_service*)arg;
258
259 /* Check that this is a valid lcore */
260 unsigned int lcore_id = rte_lcore_id();
261 if (lcore_id == LCORE_ID_ANY)
262 return -ENODEV;
263
264 /* Check that this lcore has ports */
265 if (srv->_ports.size() == 0)
266 return -ENODEV;
267
268 char name[16];
269 snprintf(name, sizeof(name), "dpdk-io_%hu", (uint16_t)lcore_id);
270 rte_thread_setname(pthread_self(), name);
271 UHD_LOG_TRACE("DPDK::IO_SERVICE",
272 "I/O service thread '" << name << "' started on lcore " << lcore_id);
273
274 uhd::set_thread_priority_safe();
275
276 snprintf(name, sizeof(name), "rx-tbl_%hu", (uint16_t)lcore_id);
277 struct rte_hash_parameters hash_params = {.name = name,
278 .entries = MAX_FLOWS,
279 .reserved = 0,
280 .key_len = sizeof(struct dpdk::ipv4_5tuple),
281 .hash_func = NULL,
282 .hash_func_init_val = 0,
283 .socket_id = uhd::narrow_cast<int>(rte_socket_id()),
284 .extra_flag = 0};
285 srv->_rx_table = rte_hash_create(&hash_params);
286 if (srv->_rx_table == NULL) {
287 return rte_errno;
288 }
289
290 int status = 0;
291 while (!status) {
292 /* For each port, attempt to receive packets and process */
293 for (auto port : srv->_ports) {
294 srv->_rx_burst(port, 0);
295 }
296 /* For each port's TX queues, do TX */
297 for (auto port : srv->_ports) {
298 srv->_tx_burst(port);
299 }
300 /* For each port's RX release queues, release buffers */
301 for (auto port : srv->_ports) {
302 srv->_rx_release(port);
303 }
304 /* Retry waking clients */
305 if (srv->_retry_head) {
306 dpdk_io_if* node = srv->_retry_head;
307 dpdk_io_if* end = srv->_retry_head->prev;
308 while (true) {
309 dpdk_io_if* next = node->next;
310 srv->_wake_client(node);
311 if (node == end) {
312 break;
313 } else {
314 node = next;
315 next = node->next;
316 }
317 }
318 }
319 /* Check for open()/close()/term() requests and service 1 at a time
320 * Leave this last so we immediately terminate if requested
321 */
322 status = srv->_service_requests();
323 }
324
325 return status;
326 }
327
_service_requests()328 int dpdk_io_service::_service_requests()
329 {
330 for (int i = 0; i < MAX_PENDING_SERVICE_REQS; i++) {
331 /* Dequeue */
332 dpdk::wait_req* req = _servq.pop();
333 if (!req) {
334 break;
335 }
336 switch (req->reason) {
337 case dpdk::wait_type::WAIT_SIMPLE:
338 while (_servq.complete(req) == -ENOBUFS)
339 ;
340 break;
341 case dpdk::wait_type::WAIT_RX:
342 case dpdk::wait_type::WAIT_TX_BUF:
343 throw uhd::not_implemented_error(
344 "DPDK: _service_requests(): DPDK is still a WIP");
345 case dpdk::wait_type::WAIT_FLOW_OPEN:
346 _service_flow_open(req);
347 break;
348 case dpdk::wait_type::WAIT_FLOW_CLOSE:
349 _service_flow_close(req);
350 break;
351 case dpdk::wait_type::WAIT_XPORT_CONNECT:
352 _service_xport_connect(req);
353 break;
354 case dpdk::wait_type::WAIT_XPORT_DISCONNECT:
355 _service_xport_disconnect(req);
356 break;
357 case dpdk::wait_type::WAIT_ARP: {
358 assert(req->data != NULL);
359 int arp_status = _service_arp_request(req);
360 assert(arp_status != -ENOMEM);
361 if (arp_status == 0) {
362 while (_servq.complete(req) == -ENOBUFS)
363 ;
364 }
365 break;
366 }
367 case dpdk::wait_type::WAIT_LCORE_TERM:
368 rte_free(_rx_table);
369 while (_servq.complete(req) == -ENOBUFS)
370 ;
371 // Return a positive value to indicate we should terminate
372 return 1;
373 default:
374 UHD_LOG_ERROR(
375 "DPDK::IO_SERVICE", "Invalid reason associated with wait request");
376 while (_servq.complete(req) == -ENOBUFS)
377 ;
378 break;
379 }
380 }
381 return 0;
382 }
383
_service_flow_open(dpdk::wait_req * req)384 void dpdk_io_service::_service_flow_open(dpdk::wait_req* req)
385 {
386 auto flow_req_data = (struct dpdk_flow_data*)req->data;
387 assert(flow_req_data);
388 if (flow_req_data->is_recv) {
389 // If RX, add to RX table. Currently, nothing to do for TX.
390 struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP,
391 .src_ip = 0,
392 .dst_ip = flow_req_data->link->get_port()->get_ipv4(),
393 .src_port = 0,
394 .dst_port = flow_req_data->link->get_local_port()};
395 // Check the UDP port isn't in use
396 if (rte_hash_lookup(_rx_table, &ht_key) > 0) {
397 req->retval = -EADDRINUSE;
398 UHD_LOG_ERROR("DPDK::IO_SERVICE", "Cannot add to RX table");
399 while (_servq.complete(req) == -ENOBUFS)
400 ;
401 return;
402 }
403 // Add xport list for this UDP port
404 auto rx_entry = new std::list<dpdk_io_if*>();
405 if (rte_hash_add_key_data(_rx_table, &ht_key, rx_entry)) {
406 UHD_LOG_ERROR("DPDK::IO_SERVICE", "Could not add new RX list to table");
407 delete rx_entry;
408 req->retval = -ENOMEM;
409 while (_servq.complete(req) == -ENOBUFS)
410 ;
411 return;
412 }
413 }
414 while (_servq.complete(req) == -ENOBUFS)
415 ;
416 }
417
_service_flow_close(dpdk::wait_req * req)418 void dpdk_io_service::_service_flow_close(dpdk::wait_req* req)
419 {
420 auto flow_req_data = (struct dpdk_flow_data*)req->data;
421 assert(flow_req_data);
422 if (flow_req_data->is_recv) {
423 // If RX, remove from RX table. Currently, nothing to do for TX.
424 struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP,
425 .src_ip = 0,
426 .dst_ip = flow_req_data->link->get_port()->get_ipv4(),
427 .src_port = 0,
428 .dst_port = flow_req_data->link->get_local_port()};
429 std::list<dpdk_io_if*>* xport_list;
430
431 if (rte_hash_lookup_data(_rx_table, &ht_key, (void**)&xport_list) > 0) {
432 UHD_ASSERT_THROW(xport_list->empty());
433 delete xport_list;
434 rte_hash_del_key(_rx_table, &ht_key);
435 while (_servq.complete(req) == -ENOBUFS)
436 ;
437 return;
438 }
439 }
440 while (_servq.complete(req) == -ENOBUFS)
441 ;
442 }
443
_service_xport_connect(dpdk::wait_req * req)444 void dpdk_io_service::_service_xport_connect(dpdk::wait_req* req)
445 {
446 auto dpdk_io = static_cast<dpdk_io_if*>(req->data);
447 UHD_ASSERT_THROW(dpdk_io);
448 auto port = dpdk_io->link->get_port();
449 if (dpdk_io->recv_cb) {
450 // Add to RX table only if have a callback.
451 struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP,
452 .src_ip = 0,
453 .dst_ip = port->get_ipv4(),
454 .src_port = 0,
455 .dst_port = dpdk_io->link->get_local_port()};
456 void* hash_data;
457 if (rte_hash_lookup_data(_rx_table, &ht_key, &hash_data) < 0) {
458 req->retval = -ENOENT;
459 UHD_LOG_ERROR("DPDK::IO_SERVICE", "Cannot add xport to RX table");
460 while (_servq.complete(req) == -ENOBUFS)
461 ;
462 return;
463 }
464 // Add to xport list for this UDP port
465 auto rx_entry = (std::list<dpdk_io_if*>*)(hash_data);
466 rx_entry->push_back(dpdk_io);
467 }
468 if (dpdk_io->is_recv) {
469 UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing RX connect request...");
470 // Add to xport list for this NIC port
471 auto& xport_list = _recv_xport_map.at(port->get_port_id());
472 xport_list.push_back((dpdk_recv_io*)dpdk_io->io_client);
473 } else {
474 UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing TX connect request...");
475 dpdk_send_io* send_io = static_cast<dpdk_send_io*>(dpdk_io->io_client);
476 // Add to xport list for this NIC port
477 auto& xport_list = _tx_queues.at(port->get_port_id());
478 xport_list.push_back(send_io);
479 for (size_t i = 0; i < send_io->_num_send_frames; i++) {
480 auto buff_ptr =
481 (dpdk::dpdk_frame_buff*)dpdk_io->link->get_send_buff(0).release();
482 if (!buff_ptr) {
483 UHD_LOG_ERROR("DPDK::IO_SERVICE",
484 "TX mempool out of memory. Please increase dpdk_num_mbufs.");
485 break;
486 }
487 if (rte_ring_enqueue(send_io->_buffer_queue, buff_ptr)) {
488 rte_pktmbuf_free(buff_ptr->get_pktmbuf());
489 break;
490 }
491 send_io->_num_frames_in_use++;
492 }
493 }
494 while (_servq.complete(req) == -ENOBUFS)
495 ;
496 }
497
_service_xport_disconnect(dpdk::wait_req * req)498 void dpdk_io_service::_service_xport_disconnect(dpdk::wait_req* req)
499 {
500 auto dpdk_io = (struct dpdk_io_if*)req->data;
501 assert(dpdk_io);
502 auto port = dpdk_io->link->get_port();
503 if (dpdk_io->recv_cb) {
504 // Remove from RX table only if have a callback.
505 struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP,
506 .src_ip = 0,
507 .dst_ip = port->get_ipv4(),
508 .src_port = 0,
509 .dst_port = dpdk_io->link->get_local_port()};
510 void* hash_data;
511 if (rte_hash_lookup_data(_rx_table, &ht_key, &hash_data) >= 0) {
512 // Remove from xport list for this UDP port
513 auto rx_entry = (std::list<dpdk_io_if*>*)(hash_data);
514 rx_entry->remove(dpdk_io);
515 } else {
516 req->retval = -EINVAL;
517 UHD_LOG_ERROR("DPDK::IO_SERVICE", "Cannot remove xport from RX table");
518 }
519 }
520 if (dpdk_io->is_recv) {
521 UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing RX disconnect request...");
522 dpdk_recv_io* recv_client = static_cast<dpdk_recv_io*>(dpdk_io->io_client);
523 // Remove from xport list for this NIC port
524 auto& xport_list = _recv_xport_map.at(port->get_port_id());
525 xport_list.remove(recv_client);
526 while (!rte_ring_empty(recv_client->_recv_queue)) {
527 frame_buff* buff_ptr;
528 rte_ring_dequeue(recv_client->_recv_queue, (void**)&buff_ptr);
529 dpdk_io->link->release_recv_buff(frame_buff::uptr(buff_ptr));
530 }
531 while (!rte_ring_empty(recv_client->_release_queue)) {
532 frame_buff* buff_ptr;
533 rte_ring_dequeue(recv_client->_release_queue, (void**)&buff_ptr);
534 dpdk_io->link->release_recv_buff(frame_buff::uptr(buff_ptr));
535 }
536 } else {
537 UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing TX disconnect request...");
538 dpdk_send_io* send_client = static_cast<dpdk_send_io*>(dpdk_io->io_client);
539 // Remove from xport list for this NIC port
540 auto& xport_list = _tx_queues.at(port->get_port_id());
541 xport_list.remove(send_client);
542 while (!rte_ring_empty(send_client->_send_queue)) {
543 frame_buff* buff_ptr;
544 rte_ring_dequeue(send_client->_send_queue, (void**)&buff_ptr);
545 dpdk_io->link->release_send_buff(frame_buff::uptr(buff_ptr));
546 }
547 while (!rte_ring_empty(send_client->_buffer_queue)) {
548 frame_buff* buff_ptr;
549 rte_ring_dequeue(send_client->_buffer_queue, (void**)&buff_ptr);
550 dpdk_io->link->release_send_buff(frame_buff::uptr(buff_ptr));
551 }
552 }
553 // Now remove the node if it's on the retry list
554 if ((_retry_head == dpdk_io) && (dpdk_io->next == dpdk_io)) {
555 _retry_head = NULL;
556 } else if (_retry_head) {
557 dpdk_io_if* node = _retry_head->next;
558 while (node != _retry_head) {
559 if (node == dpdk_io) {
560 dpdk_io->prev->next = dpdk_io->next;
561 dpdk_io->next->prev = dpdk_io->prev;
562 break;
563 }
564 node = node->next;
565 }
566 }
567 while (_servq.complete(req) == -ENOBUFS)
568 ;
569 }
570
_service_arp_request(dpdk::wait_req * req)571 int dpdk_io_service::_service_arp_request(dpdk::wait_req* req)
572 {
573 int status = 0;
574 auto arp_req_data = (struct dpdk::arp_request*)req->data;
575 dpdk::ipv4_addr dst_addr = arp_req_data->tpa;
576 auto ctx_sptr = _ctx.lock();
577 UHD_ASSERT_THROW(ctx_sptr);
578 dpdk::dpdk_port* port = ctx_sptr->get_port(arp_req_data->port);
579 UHD_LOG_TRACE("DPDK::IO_SERVICE",
580 "ARP: Requesting address for " << dpdk::ipv4_num_to_str(dst_addr));
581
582 rte_spinlock_lock(&port->_spinlock);
583 struct dpdk::arp_entry* entry = NULL;
584 if (port->_arp_table.count(dst_addr) == 0) {
585 entry = (struct dpdk::arp_entry*)rte_zmalloc(NULL, sizeof(*entry), 0);
586 if (!entry) {
587 status = -ENOMEM;
588 goto arp_end;
589 }
590 entry = new (entry) dpdk::arp_entry();
591 entry->reqs.push_back(req);
592 port->_arp_table[dst_addr] = entry;
593 status = -EAGAIN;
594 UHD_LOG_TRACE("DPDK::IO_SERVICE", "Address not in table. Sending ARP request.");
595 _send_arp_request(port, 0, arp_req_data->tpa);
596 } else {
597 entry = port->_arp_table.at(dst_addr);
598 if (is_zero_ether_addr(&entry->mac_addr)) {
599 UHD_LOG_TRACE("DPDK::IO_SERVICE",
600 "ARP: Address in table, but not populated yet. Resending ARP request.");
601 port->_arp_table.at(dst_addr)->reqs.push_back(req);
602 status = -EAGAIN;
603 _send_arp_request(port, 0, arp_req_data->tpa);
604 } else {
605 UHD_LOG_TRACE("DPDK::IO_SERVICE", "ARP: Address in table.");
606 ether_addr_copy(&entry->mac_addr, &arp_req_data->tha);
607 status = 0;
608 }
609 }
610 arp_end:
611 rte_spinlock_unlock(&port->_spinlock);
612 return status;
613 }
614
_send_arp_request(dpdk::dpdk_port * port,dpdk::queue_id_t queue,dpdk::ipv4_addr ip)615 int dpdk_io_service::_send_arp_request(
616 dpdk::dpdk_port* port, dpdk::queue_id_t queue, dpdk::ipv4_addr ip)
617 {
618 struct rte_mbuf* mbuf;
619 struct ether_hdr* hdr;
620 struct arp_hdr* arp_frame;
621
622 mbuf = rte_pktmbuf_alloc(port->get_tx_pktbuf_pool());
623 if (unlikely(mbuf == NULL)) {
624 UHD_LOG_WARNING(
625 "DPDK::IO_SERVICE", "Could not allocate packet buffer for ARP request");
626 return -ENOMEM;
627 }
628
629 hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr*);
630 arp_frame = (struct arp_hdr*)&hdr[1];
631
632 memset(hdr->d_addr.addr_bytes, 0xFF, ETHER_ADDR_LEN);
633 hdr->s_addr = port->get_mac_addr();
634 hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_ARP);
635
636 arp_frame->arp_hrd = rte_cpu_to_be_16(ARP_HRD_ETHER);
637 arp_frame->arp_pro = rte_cpu_to_be_16(ETHER_TYPE_IPv4);
638 arp_frame->arp_hln = 6;
639 arp_frame->arp_pln = 4;
640 arp_frame->arp_op = rte_cpu_to_be_16(ARP_OP_REQUEST);
641 arp_frame->arp_data.arp_sha = port->get_mac_addr();
642 arp_frame->arp_data.arp_sip = port->get_ipv4();
643 memset(arp_frame->arp_data.arp_tha.addr_bytes, 0x00, ETHER_ADDR_LEN);
644 arp_frame->arp_data.arp_tip = ip;
645
646 mbuf->pkt_len = 42;
647 mbuf->data_len = 42;
648
649 if (rte_eth_tx_burst(port->get_port_id(), queue, &mbuf, 1) != 1) {
650 UHD_LOG_WARNING("DPDK::IO_SERVICE", "ARP request not sent: Descriptor ring full");
651 rte_pktmbuf_free(mbuf);
652 return -EAGAIN;
653 }
654 return 0;
655 }
656
657 /* Do a burst of RX on port */
_rx_burst(dpdk::dpdk_port * port,dpdk::queue_id_t queue)658 int dpdk_io_service::_rx_burst(dpdk::dpdk_port* port, dpdk::queue_id_t queue)
659 {
660 struct ether_hdr* hdr;
661 char* l2_data;
662 struct rte_mbuf* bufs[RX_BURST_SIZE];
663 const uint16_t num_rx =
664 rte_eth_rx_burst(port->get_port_id(), queue, bufs, RX_BURST_SIZE);
665 if (unlikely(num_rx == 0)) {
666 return 0;
667 }
668
669 for (int buf = 0; buf < num_rx; buf++) {
670 uint64_t ol_flags = bufs[buf]->ol_flags;
671 hdr = rte_pktmbuf_mtod(bufs[buf], struct ether_hdr*);
672 l2_data = (char*)&hdr[1];
673 switch (rte_be_to_cpu_16(hdr->ether_type)) {
674 case ETHER_TYPE_ARP:
675 _process_arp(port, queue, (struct arp_hdr*)l2_data);
676 rte_pktmbuf_free(bufs[buf]);
677 break;
678 case ETHER_TYPE_IPv4:
679 if ((ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_BAD) {
680 UHD_LOG_WARNING("DPDK::IO_SERVICE", "RX packet has bad IP cksum");
681 } else if ((ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_NONE) {
682 UHD_LOG_WARNING("DPDK::IO_SERVICE", "RX packet missing IP cksum");
683 } else {
684 _process_ipv4(port, bufs[buf], (struct ipv4_hdr*)l2_data);
685 }
686 break;
687 default:
688 rte_pktmbuf_free(bufs[buf]);
689 break;
690 }
691 }
692 return num_rx;
693 }
694
_process_arp(dpdk::dpdk_port * port,dpdk::queue_id_t queue_id,struct arp_hdr * arp_frame)695 int dpdk_io_service::_process_arp(
696 dpdk::dpdk_port* port, dpdk::queue_id_t queue_id, struct arp_hdr* arp_frame)
697 {
698 uint32_t dest_ip = arp_frame->arp_data.arp_sip;
699 struct ether_addr dest_addr = arp_frame->arp_data.arp_sha;
700 UHD_LOG_TRACE("DPDK::IO_SERVICE",
701 "Processing ARP packet: " << dpdk::ipv4_num_to_str(dest_ip) << " -> "
702 << dpdk::eth_addr_to_string(dest_addr));
703 /* Add entry to ARP table */
704 rte_spinlock_lock(&port->_spinlock);
705 struct dpdk::arp_entry* entry = NULL;
706 if (port->_arp_table.count(dest_ip) == 0) {
707 entry = (struct dpdk::arp_entry*)rte_zmalloc(NULL, sizeof(*entry), 0);
708 if (!entry) {
709 return -ENOMEM;
710 }
711 entry = new (entry) dpdk::arp_entry();
712 ether_addr_copy(&dest_addr, &entry->mac_addr);
713 port->_arp_table[dest_ip] = entry;
714 } else {
715 entry = port->_arp_table.at(dest_ip);
716 ether_addr_copy(&dest_addr, &entry->mac_addr);
717 for (auto req : entry->reqs) {
718 auto arp_data = (struct dpdk::arp_request*)req->data;
719 ether_addr_copy(&dest_addr, &arp_data->tha);
720 while (_servq.complete(req) == -ENOBUFS)
721 ;
722 }
723 entry->reqs.clear();
724 }
725 rte_spinlock_unlock(&port->_spinlock);
726
727 /* Respond if this was an ARP request */
728 if (arp_frame->arp_op == rte_cpu_to_be_16(ARP_OP_REQUEST)
729 && arp_frame->arp_data.arp_tip == port->get_ipv4()) {
730 UHD_LOG_TRACE("DPDK::IO_SERVICE", "Sending ARP reply.");
731 port->_arp_reply(queue_id, arp_frame);
732 }
733
734 return 0;
735 }
736
_process_ipv4(dpdk::dpdk_port * port,struct rte_mbuf * mbuf,struct ipv4_hdr * pkt)737 int dpdk_io_service::_process_ipv4(
738 dpdk::dpdk_port* port, struct rte_mbuf* mbuf, struct ipv4_hdr* pkt)
739 {
740 bool bcast = port->dst_is_broadcast(pkt->dst_addr);
741 if (pkt->dst_addr != port->get_ipv4() && !bcast) {
742 rte_pktmbuf_free(mbuf);
743 return -ENODEV;
744 }
745 if (pkt->next_proto_id == IPPROTO_UDP) {
746 return _process_udp(port, mbuf, (struct udp_hdr*)&pkt[1], bcast);
747 }
748 rte_pktmbuf_free(mbuf);
749 return -EINVAL;
750 }
751
752
_process_udp(dpdk::dpdk_port * port,struct rte_mbuf * mbuf,struct udp_hdr * pkt,bool)753 int dpdk_io_service::_process_udp(
754 dpdk::dpdk_port* port, struct rte_mbuf* mbuf, struct udp_hdr* pkt, bool /*bcast*/)
755 {
756 // Get the link
757 struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP,
758 .src_ip = 0,
759 .dst_ip = port->get_ipv4(),
760 .src_port = 0,
761 .dst_port = pkt->dst_port};
762 void* hash_data;
763 if (rte_hash_lookup_data(_rx_table, &ht_key, &hash_data) < 0) {
764 UHD_LOG_WARNING("DPDK::IO_SERVICE", "Dropping packet: No link entry in rx table");
765 rte_pktmbuf_free(mbuf);
766 return -ENOENT;
767 }
768 // Get xport list for this UDP port
769 auto rx_entry = (std::list<dpdk_io_if*>*)(hash_data);
770 if (rx_entry->empty()) {
771 UHD_LOG_WARNING("DPDK::IO_SERVICE", "Dropping packet: No xports for link");
772 rte_pktmbuf_free(mbuf);
773 return -ENOENT;
774 }
775 // Turn rte_mbuf -> dpdk_frame_buff
776 auto link = rx_entry->front()->link;
777 link->enqueue_recv_mbuf(mbuf);
778 auto buff = link->get_recv_buff(0);
779 bool rcvr_found = false;
780 for (auto client_if : *rx_entry) {
781 // Check all the muxed receivers...
782 if (client_if->recv_cb(buff, link, link)) {
783 rcvr_found = true;
784 if (buff) {
785 assert(client_if->is_recv);
786 auto recv_io = (dpdk_recv_io*)client_if->io_client;
787 auto buff_ptr = (dpdk::dpdk_frame_buff*)buff.release();
788 if (rte_ring_enqueue(recv_io->_recv_queue, buff_ptr)) {
789 rte_pktmbuf_free(buff_ptr->get_pktmbuf());
790 UHD_LOG_WARNING(
791 "DPDK::IO_SERVICE", "Dropping packet: No space in recv queue");
792 } else {
793 recv_io->_num_frames_in_use++;
794 assert(recv_io->_num_frames_in_use <= recv_io->_num_recv_frames);
795 _wake_client(client_if);
796 }
797 }
798 break;
799 }
800 }
801 if (!rcvr_found) {
802 UHD_LOG_WARNING("DPDK::IO_SERVICE", "Dropping packet: No receiver xport found");
803 // Release the buffer if no receiver found
804 link->release_recv_buff(std::move(buff));
805 return -ENOENT;
806 }
807 return 0;
808 }
809
810 /* Do a burst of TX on port's tx queues */
_tx_burst(dpdk::dpdk_port * port)811 int dpdk_io_service::_tx_burst(dpdk::dpdk_port* port)
812 {
813 unsigned int total_tx = 0;
814 auto& queues = _tx_queues.at(port->get_port_id());
815
816 for (auto& send_io : queues) {
817 unsigned int num_tx = rte_ring_count(send_io->_send_queue);
818 num_tx = (num_tx < TX_BURST_SIZE) ? num_tx : TX_BURST_SIZE;
819 bool replaced_buffers = false;
820 for (unsigned int i = 0; i < num_tx; i++) {
821 size_t frame_size = send_io->_dpdk_io_if.link->get_send_frame_size();
822 if (send_io->_fc_cb && !send_io->_fc_cb(frame_size)) {
823 break;
824 }
825 dpdk::dpdk_frame_buff* buff_ptr;
826 int status = rte_ring_dequeue(send_io->_send_queue, (void**)&buff_ptr);
827 if (status) {
828 UHD_LOG_ERROR("DPDK::IO_SERVICE", "TX Q Count doesn't match actual");
829 break;
830 }
831 send_io->_send_cb(frame_buff::uptr(buff_ptr), send_io->_dpdk_io_if.link);
832 // Attempt to replace buffer
833 buff_ptr = (dpdk::dpdk_frame_buff*)send_io->_dpdk_io_if.link->get_send_buff(0)
834 .release();
835 if (!buff_ptr) {
836 UHD_LOG_ERROR("DPDK::IO_SERVICE",
837 "TX mempool out of memory. Please increase dpdk_num_mbufs.");
838 send_io->_num_frames_in_use--;
839 } else if (rte_ring_enqueue(send_io->_buffer_queue, buff_ptr)) {
840 rte_pktmbuf_free(buff_ptr->get_pktmbuf());
841 send_io->_num_frames_in_use--;
842 } else {
843 replaced_buffers = true;
844 }
845 }
846 if (replaced_buffers) {
847 _wake_client(&send_io->_dpdk_io_if);
848 }
849 total_tx += num_tx;
850 }
851
852 return total_tx;
853 }
854
_rx_release(dpdk::dpdk_port * port)855 int dpdk_io_service::_rx_release(dpdk::dpdk_port* port)
856 {
857 unsigned int total_bufs = 0;
858 auto& queues = _recv_xport_map.at(port->get_port_id());
859
860 for (auto& recv_io : queues) {
861 unsigned int num_buf = rte_ring_count(recv_io->_release_queue);
862 num_buf = (num_buf < RX_BURST_SIZE) ? num_buf : RX_BURST_SIZE;
863 for (unsigned int i = 0; i < num_buf; i++) {
864 dpdk::dpdk_frame_buff* buff_ptr;
865 int status = rte_ring_dequeue(recv_io->_release_queue, (void**)&buff_ptr);
866 if (status) {
867 UHD_LOG_ERROR("DPDK::IO_SERVICE", "RX Q Count doesn't match actual");
868 break;
869 }
870 recv_io->_fc_cb(frame_buff::uptr(buff_ptr),
871 recv_io->_dpdk_io_if.link,
872 recv_io->_dpdk_io_if.link);
873 recv_io->_num_frames_in_use--;
874 }
875 total_bufs += num_buf;
876 }
877
878 return total_bufs;
879 }
880
_get_unique_client_id()881 uint16_t dpdk_io_service::_get_unique_client_id()
882 {
883 std::lock_guard<std::mutex> lock(_mutex);
884 if (_client_id_set.size() >= MAX_CLIENTS) {
885 UHD_LOG_ERROR("DPDK::IO_SERVICE", "Exceeded maximum number of clients");
886 throw uhd::runtime_error("DPDK::IO_SERVICE: Exceeded maximum number of clients");
887 }
888
889 uint16_t id = _next_client_id++;
890 while (_client_id_set.count(id)) {
891 id = _next_client_id++;
892 }
893 _client_id_set.insert(id);
894 return id;
895 }
896
_wake_client(dpdk_io_if * dpdk_io)897 void dpdk_io_service::_wake_client(dpdk_io_if* dpdk_io)
898 {
899 dpdk::wait_req* req;
900 if (dpdk_io->is_recv) {
901 auto recv_io = static_cast<dpdk_recv_io*>(dpdk_io->io_client);
902 req = recv_io->_waiter;
903 } else {
904 auto send_io = static_cast<dpdk_send_io*>(dpdk_io->io_client);
905 req = send_io->_waiter;
906 }
907 bool stat = req->mutex.try_lock();
908 if (stat) {
909 bool active_req = !req->complete;
910 if (dpdk_io->next) {
911 // On the list: Take it off
912 if (dpdk_io->next == dpdk_io) {
913 // Only node on the list
914 _retry_head = NULL;
915 } else {
916 // Other nodes are on the list
917 if (_retry_head == dpdk_io) {
918 // Move list head to next
919 _retry_head = dpdk_io->next;
920 }
921 dpdk_io->next->prev = dpdk_io->prev;
922 dpdk_io->prev->next = dpdk_io->next;
923 }
924 dpdk_io->next = NULL;
925 dpdk_io->prev = NULL;
926 }
927 if (active_req) {
928 req->complete = true;
929 req->cond.notify_one();
930 }
931 req->mutex.unlock();
932 if (active_req) {
933 wait_req_put(req);
934 }
935 } else {
936 // Put on the retry list, if it isn't already
937 if (!dpdk_io->next) {
938 if (_retry_head) {
939 dpdk_io->next = _retry_head;
940 dpdk_io->prev = _retry_head->prev;
941 _retry_head->prev->next = dpdk_io;
942 _retry_head->prev = dpdk_io;
943 } else {
944 _retry_head = dpdk_io;
945 dpdk_io->next = dpdk_io;
946 dpdk_io->prev = dpdk_io;
947 }
948 }
949 }
950 }
951