1 /* 2 * Copyright (C) 2017 Corelight, Inc. and Universita` di Pisa. All rights reserved. 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions 6 * are met: 7 * 1. Redistributions of source code must retain the above copyright 8 * notice, this list of conditions and the following disclaimer. 9 * 2. Redistributions in binary form must reproduce the above copyright 10 * notice, this list of conditions and the following disclaimer in the 11 * documentation and/or other materials provided with the distribution. 12 * 13 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 14 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 15 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 16 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 17 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 18 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 19 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 20 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 21 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 22 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 23 * SUCH DAMAGE. 24 */ 25 #include <ctype.h> 26 #include <errno.h> 27 #include <inttypes.h> 28 #include <libnetmap.h> 29 #include <netinet/in.h> /* htonl */ 30 #include <pthread.h> 31 #include <signal.h> 32 #include <stdbool.h> 33 #include <stdio.h> 34 #include <stdlib.h> 35 #include <string.h> 36 #include <syslog.h> 37 #include <sys/ioctl.h> 38 #include <sys/poll.h> 39 #include <unistd.h> 40 41 #include "pkt_hash.h" 42 #include "ctrs.h" 43 44 45 /* 46 * use our version of header structs, rather than bringing in a ton 47 * of platform specific ones 48 */ 49 #ifndef ETH_ALEN 50 #define ETH_ALEN 6 51 #endif 52 53 struct compact_eth_hdr { 54 unsigned char h_dest[ETH_ALEN]; 55 unsigned char h_source[ETH_ALEN]; 56 u_int16_t h_proto; 57 }; 58 59 struct compact_ip_hdr { 60 u_int8_t ihl:4, version:4; 61 u_int8_t tos; 62 u_int16_t tot_len; 63 u_int16_t id; 64 u_int16_t frag_off; 65 u_int8_t ttl; 66 u_int8_t protocol; 67 u_int16_t check; 68 u_int32_t saddr; 69 u_int32_t daddr; 70 }; 71 72 struct compact_ipv6_hdr { 73 u_int8_t priority:4, version:4; 74 u_int8_t flow_lbl[3]; 75 u_int16_t payload_len; 76 u_int8_t nexthdr; 77 u_int8_t hop_limit; 78 struct in6_addr saddr; 79 struct in6_addr daddr; 80 }; 81 82 #define MAX_IFNAMELEN 64 83 #define MAX_PORTNAMELEN (MAX_IFNAMELEN + 40) 84 #define DEF_OUT_PIPES 2 85 #define DEF_EXTRA_BUFS 0 86 #define DEF_BATCH 2048 87 #define DEF_WAIT_LINK 2 88 #define DEF_STATS_INT 600 89 #define BUF_REVOKE 150 90 #define STAT_MSG_MAXSIZE 1024 91 92 static struct { 93 char ifname[MAX_IFNAMELEN + 1]; 94 char base_name[MAX_IFNAMELEN + 1]; 95 int netmap_fd; 96 uint16_t output_rings; 97 uint16_t num_groups; 98 uint32_t extra_bufs; 99 uint16_t batch; 100 int stdout_interval; 101 int syslog_interval; 102 int wait_link; 103 bool busy_wait; 104 } glob_arg; 105 106 /* 107 * the overflow queue is a circular queue of buffers 108 */ 109 struct overflow_queue { 110 char name[MAX_IFNAMELEN + 16]; 111 struct netmap_slot *slots; 112 uint32_t head; 113 uint32_t tail; 114 uint32_t n; 115 uint32_t size; 116 }; 117 118 static struct overflow_queue *freeq; 119 120 static inline int 121 oq_full(struct overflow_queue *q) 122 { 123 return q->n >= q->size; 124 } 125 126 static inline int 127 oq_empty(struct overflow_queue *q) 128 { 129 return q->n <= 0; 130 } 131 132 static inline void 133 oq_enq(struct overflow_queue *q, const struct netmap_slot *s) 134 { 135 if (unlikely(oq_full(q))) { 136 D("%s: queue full!", q->name); 137 abort(); 138 } 139 q->slots[q->tail] = *s; 140 q->n++; 141 q->tail++; 142 if (q->tail >= q->size) 143 q->tail = 0; 144 } 145 146 static inline struct netmap_slot 147 oq_deq(struct overflow_queue *q) 148 { 149 struct netmap_slot s = q->slots[q->head]; 150 if (unlikely(oq_empty(q))) { 151 D("%s: queue empty!", q->name); 152 abort(); 153 } 154 q->n--; 155 q->head++; 156 if (q->head >= q->size) 157 q->head = 0; 158 return s; 159 } 160 161 static volatile int do_abort = 0; 162 163 static uint64_t dropped = 0; 164 static uint64_t forwarded = 0; 165 static uint64_t received_bytes = 0; 166 static uint64_t received_pkts = 0; 167 static uint64_t non_ip = 0; 168 static uint32_t freeq_n = 0; 169 170 struct port_des { 171 char interface[MAX_PORTNAMELEN]; 172 struct my_ctrs ctr; 173 unsigned int last_sync; 174 uint32_t last_tail; 175 struct overflow_queue *oq; 176 struct nmport_d *nmd; 177 struct netmap_ring *ring; 178 struct group_des *group; 179 }; 180 181 static struct port_des *ports; 182 183 /* each group of pipes receives all the packets */ 184 struct group_des { 185 char pipename[MAX_IFNAMELEN]; 186 struct port_des *ports; 187 int first_id; 188 int nports; 189 int last; 190 int custom_port; 191 }; 192 193 static struct group_des *groups; 194 195 /* statistcs */ 196 struct counters { 197 struct timeval ts; 198 struct my_ctrs *ctrs; 199 uint64_t received_pkts; 200 uint64_t received_bytes; 201 uint64_t non_ip; 202 uint32_t freeq_n; 203 int status __attribute__((aligned(64))); 204 #define COUNTERS_EMPTY 0 205 #define COUNTERS_FULL 1 206 }; 207 208 static struct counters counters_buf; 209 210 static void * 211 print_stats(void *arg) 212 { 213 int npipes = glob_arg.output_rings; 214 int sys_int = 0; 215 (void)arg; 216 struct my_ctrs cur, prev; 217 struct my_ctrs *pipe_prev; 218 219 pipe_prev = calloc(npipes, sizeof(struct my_ctrs)); 220 if (pipe_prev == NULL) { 221 D("out of memory"); 222 exit(1); 223 } 224 225 char stat_msg[STAT_MSG_MAXSIZE] = ""; 226 227 memset(&prev, 0, sizeof(prev)); 228 while (!do_abort) { 229 int j, dosyslog = 0, dostdout = 0, newdata; 230 uint64_t pps = 0, dps = 0, bps = 0, dbps = 0, usec = 0; 231 struct my_ctrs x; 232 233 counters_buf.status = COUNTERS_EMPTY; 234 newdata = 0; 235 memset(&cur, 0, sizeof(cur)); 236 sleep(1); 237 if (counters_buf.status == COUNTERS_FULL) { 238 __sync_synchronize(); 239 newdata = 1; 240 cur.t = counters_buf.ts; 241 if (prev.t.tv_sec || prev.t.tv_usec) { 242 usec = (cur.t.tv_sec - prev.t.tv_sec) * 1000000 + 243 cur.t.tv_usec - prev.t.tv_usec; 244 } 245 } 246 247 ++sys_int; 248 if (glob_arg.stdout_interval && sys_int % glob_arg.stdout_interval == 0) 249 dostdout = 1; 250 if (glob_arg.syslog_interval && sys_int % glob_arg.syslog_interval == 0) 251 dosyslog = 1; 252 253 for (j = 0; j < npipes; ++j) { 254 struct my_ctrs *c = &counters_buf.ctrs[j]; 255 cur.pkts += c->pkts; 256 cur.drop += c->drop; 257 cur.drop_bytes += c->drop_bytes; 258 cur.bytes += c->bytes; 259 260 if (usec) { 261 x.pkts = c->pkts - pipe_prev[j].pkts; 262 x.drop = c->drop - pipe_prev[j].drop; 263 x.bytes = c->bytes - pipe_prev[j].bytes; 264 x.drop_bytes = c->drop_bytes - pipe_prev[j].drop_bytes; 265 pps = (x.pkts*1000000 + usec/2) / usec; 266 dps = (x.drop*1000000 + usec/2) / usec; 267 bps = ((x.bytes*1000000 + usec/2) / usec) * 8; 268 dbps = ((x.drop_bytes*1000000 + usec/2) / usec) * 8; 269 } 270 pipe_prev[j] = *c; 271 272 if ( (dosyslog || dostdout) && newdata ) 273 snprintf(stat_msg, STAT_MSG_MAXSIZE, 274 "{" 275 "\"ts\":%.6f," 276 "\"interface\":\"%s\"," 277 "\"output_ring\":%" PRIu16 "," 278 "\"packets_forwarded\":%" PRIu64 "," 279 "\"packets_dropped\":%" PRIu64 "," 280 "\"data_forward_rate_Mbps\":%.4f," 281 "\"data_drop_rate_Mbps\":%.4f," 282 "\"packet_forward_rate_kpps\":%.4f," 283 "\"packet_drop_rate_kpps\":%.4f," 284 "\"overflow_queue_size\":%" PRIu32 285 "}", cur.t.tv_sec + (cur.t.tv_usec / 1000000.0), 286 ports[j].interface, 287 j, 288 c->pkts, 289 c->drop, 290 (double)bps / 1024 / 1024, 291 (double)dbps / 1024 / 1024, 292 (double)pps / 1000, 293 (double)dps / 1000, 294 c->oq_n); 295 296 if (dosyslog && stat_msg[0]) 297 syslog(LOG_INFO, "%s", stat_msg); 298 if (dostdout && stat_msg[0]) 299 printf("%s\n", stat_msg); 300 } 301 if (usec) { 302 x.pkts = cur.pkts - prev.pkts; 303 x.drop = cur.drop - prev.drop; 304 x.bytes = cur.bytes - prev.bytes; 305 x.drop_bytes = cur.drop_bytes - prev.drop_bytes; 306 pps = (x.pkts*1000000 + usec/2) / usec; 307 dps = (x.drop*1000000 + usec/2) / usec; 308 bps = ((x.bytes*1000000 + usec/2) / usec) * 8; 309 dbps = ((x.drop_bytes*1000000 + usec/2) / usec) * 8; 310 } 311 312 if ( (dosyslog || dostdout) && newdata ) 313 snprintf(stat_msg, STAT_MSG_MAXSIZE, 314 "{" 315 "\"ts\":%.6f," 316 "\"interface\":\"%s\"," 317 "\"output_ring\":null," 318 "\"packets_received\":%" PRIu64 "," 319 "\"packets_forwarded\":%" PRIu64 "," 320 "\"packets_dropped\":%" PRIu64 "," 321 "\"non_ip_packets\":%" PRIu64 "," 322 "\"data_forward_rate_Mbps\":%.4f," 323 "\"data_drop_rate_Mbps\":%.4f," 324 "\"packet_forward_rate_kpps\":%.4f," 325 "\"packet_drop_rate_kpps\":%.4f," 326 "\"free_buffer_slots\":%" PRIu32 327 "}", cur.t.tv_sec + (cur.t.tv_usec / 1000000.0), 328 glob_arg.ifname, 329 received_pkts, 330 cur.pkts, 331 cur.drop, 332 counters_buf.non_ip, 333 (double)bps / 1024 / 1024, 334 (double)dbps / 1024 / 1024, 335 (double)pps / 1000, 336 (double)dps / 1000, 337 counters_buf.freeq_n); 338 339 if (dosyslog && stat_msg[0]) 340 syslog(LOG_INFO, "%s", stat_msg); 341 if (dostdout && stat_msg[0]) 342 printf("%s\n", stat_msg); 343 344 prev = cur; 345 } 346 347 free(pipe_prev); 348 349 return NULL; 350 } 351 352 static void 353 free_buffers(void) 354 { 355 int i, tot = 0; 356 struct port_des *rxport = &ports[glob_arg.output_rings]; 357 358 /* build a netmap free list with the buffers in all the overflow queues */ 359 for (i = 0; i < glob_arg.output_rings + 1; i++) { 360 struct port_des *cp = &ports[i]; 361 struct overflow_queue *q = cp->oq; 362 363 if (!q) 364 continue; 365 366 while (q->n) { 367 struct netmap_slot s = oq_deq(q); 368 uint32_t *b = (uint32_t *)NETMAP_BUF(cp->ring, s.buf_idx); 369 370 *b = rxport->nmd->nifp->ni_bufs_head; 371 rxport->nmd->nifp->ni_bufs_head = s.buf_idx; 372 tot++; 373 } 374 } 375 D("added %d buffers to netmap free list", tot); 376 377 for (i = 0; i < glob_arg.output_rings + 1; ++i) { 378 nmport_close(ports[i].nmd); 379 } 380 } 381 382 383 static void sigint_h(int sig) 384 { 385 (void)sig; /* UNUSED */ 386 do_abort = 1; 387 signal(SIGINT, SIG_DFL); 388 } 389 390 static void 391 usage(void) 392 { 393 printf("usage: lb [options]\n"); 394 printf("where options are:\n"); 395 printf(" -h view help text\n"); 396 printf(" -i iface interface name (required)\n"); 397 printf(" -p [prefix:]npipes add a new group of output pipes\n"); 398 printf(" -B nbufs number of extra buffers (default: %d)\n", DEF_EXTRA_BUFS); 399 printf(" -b batch batch size (default: %d)\n", DEF_BATCH); 400 printf(" -w seconds wait for link up (default: %d)\n", DEF_WAIT_LINK); 401 printf(" -W enable busy waiting. this will run your CPU at 100%%\n"); 402 printf(" -s seconds seconds between syslog stats messages (default: 0)\n"); 403 printf(" -o seconds seconds between stdout stats messages (default: 0)\n"); 404 exit(0); 405 } 406 407 static int 408 parse_pipes(const char *spec) 409 { 410 const char *end = index(spec, ':'); 411 static int max_groups = 0; 412 struct group_des *g; 413 414 ND("spec %s num_groups %d", spec, glob_arg.num_groups); 415 if (max_groups < glob_arg.num_groups + 1) { 416 size_t size = sizeof(*g) * (glob_arg.num_groups + 1); 417 groups = realloc(groups, size); 418 if (groups == NULL) { 419 D("out of memory"); 420 return 1; 421 } 422 } 423 g = &groups[glob_arg.num_groups]; 424 memset(g, 0, sizeof(*g)); 425 426 if (end != NULL) { 427 if (end - spec > MAX_IFNAMELEN - 8) { 428 D("name '%s' too long", spec); 429 return 1; 430 } 431 if (end == spec) { 432 D("missing prefix before ':' in '%s'", spec); 433 return 1; 434 } 435 strncpy(g->pipename, spec, end - spec); 436 g->custom_port = 1; 437 end++; 438 } else { 439 /* no prefix, this group will use the 440 * name of the input port. 441 * This will be set in init_groups(), 442 * since here the input port may still 443 * be uninitialized 444 */ 445 end = spec; 446 } 447 if (*end == '\0') { 448 g->nports = DEF_OUT_PIPES; 449 } else { 450 g->nports = atoi(end); 451 if (g->nports < 1) { 452 D("invalid number of pipes '%s' (must be at least 1)", end); 453 return 1; 454 } 455 } 456 glob_arg.output_rings += g->nports; 457 glob_arg.num_groups++; 458 return 0; 459 } 460 461 /* complete the initialization of the groups data structure */ 462 static void 463 init_groups(void) 464 { 465 int i, j, t = 0; 466 struct group_des *g = NULL; 467 for (i = 0; i < glob_arg.num_groups; i++) { 468 g = &groups[i]; 469 g->ports = &ports[t]; 470 for (j = 0; j < g->nports; j++) 471 g->ports[j].group = g; 472 t += g->nports; 473 if (!g->custom_port) 474 strcpy(g->pipename, glob_arg.base_name); 475 for (j = 0; j < i; j++) { 476 struct group_des *h = &groups[j]; 477 if (!strcmp(h->pipename, g->pipename)) 478 g->first_id += h->nports; 479 } 480 } 481 g->last = 1; 482 } 483 484 485 /* To support packets that span multiple slots (NS_MOREFRAG) we 486 * need to make sure of the following: 487 * 488 * - all fragments of the same packet must go to the same output pipe 489 * - when dropping, all fragments of the same packet must be dropped 490 * 491 * For the former point we remember and reuse the last hash computed 492 * in each input ring, and only update it when NS_MOREFRAG was not 493 * set in the last received slot (this marks the start of a new packet). 494 * 495 * For the latter point, we only update the output ring head pointer 496 * when an entire packet has been forwarded. We keep a shadow_head 497 * pointer to know where to put the next partial fragment and, 498 * when the need to drop arises, we roll it back to head. 499 */ 500 struct morefrag { 501 uint16_t last_flag; /* for input rings */ 502 uint32_t last_hash; /* for input rings */ 503 uint32_t shadow_head; /* for output rings */ 504 }; 505 506 /* push the packet described by slot rs to the group g. 507 * This may cause other buffers to be pushed down the 508 * chain headed by g. 509 * Return a free buffer. 510 */ 511 static uint32_t 512 forward_packet(struct group_des *g, struct netmap_slot *rs) 513 { 514 uint32_t hash = rs->ptr; 515 uint32_t output_port = hash % g->nports; 516 struct port_des *port = &g->ports[output_port]; 517 struct netmap_ring *ring = port->ring; 518 struct overflow_queue *q = port->oq; 519 struct morefrag *mf = (struct morefrag *)ring->sem; 520 uint16_t curmf = rs->flags & NS_MOREFRAG; 521 522 /* Move the packet to the output pipe, unless there is 523 * either no space left on the ring, or there is some 524 * packet still in the overflow queue (since those must 525 * take precedence over the new one) 526 */ 527 if (mf->shadow_head != ring->tail && (q == NULL || oq_empty(q))) { 528 struct netmap_slot *ts = &ring->slot[mf->shadow_head]; 529 struct netmap_slot old_slot = *ts; 530 531 ts->buf_idx = rs->buf_idx; 532 ts->len = rs->len; 533 ts->flags = rs->flags | NS_BUF_CHANGED; 534 ts->ptr = rs->ptr; 535 mf->shadow_head = nm_ring_next(ring, mf->shadow_head); 536 if (!curmf) { 537 ring->head = mf->shadow_head; 538 } 539 ND("curmf %2x ts->flags %2x shadow_head %3u head %3u tail %3u", 540 curmf, ts->flags, mf->shadow_head, ring->head, ring->tail); 541 port->ctr.bytes += rs->len; 542 port->ctr.pkts++; 543 forwarded++; 544 return old_slot.buf_idx; 545 } 546 547 /* use the overflow queue, if available */ 548 if (q == NULL || oq_full(q)) { 549 uint32_t scan; 550 /* no space left on the ring and no overflow queue 551 * available: we are forced to drop the packet 552 */ 553 554 /* drop previous fragments, if any */ 555 for (scan = ring->head; scan != mf->shadow_head; 556 scan = nm_ring_next(ring, scan)) { 557 struct netmap_slot *ts = &ring->slot[scan]; 558 dropped++; 559 port->ctr.drop_bytes += ts->len; 560 } 561 mf->shadow_head = ring->head; 562 563 dropped++; 564 port->ctr.drop++; 565 port->ctr.drop_bytes += rs->len; 566 return rs->buf_idx; 567 } 568 569 oq_enq(q, rs); 570 571 /* 572 * we cannot continue down the chain and we need to 573 * return a free buffer now. We take it from the free queue. 574 */ 575 if (oq_empty(freeq)) { 576 /* the free queue is empty. Revoke some buffers 577 * from the longest overflow queue 578 */ 579 uint32_t j; 580 struct port_des *lp = &ports[0]; 581 uint32_t max = lp->oq->n; 582 583 /* let lp point to the port with the longest queue */ 584 for (j = 1; j < glob_arg.output_rings; j++) { 585 struct port_des *cp = &ports[j]; 586 if (cp->oq->n > max) { 587 lp = cp; 588 max = cp->oq->n; 589 } 590 } 591 592 /* move the oldest BUF_REVOKE buffers from the 593 * lp queue to the free queue 594 * 595 * We cannot revoke a partially received packet. 596 * To make thinks simple we make sure to leave 597 * at least NETMAP_MAX_FRAGS slots in the queue. 598 */ 599 for (j = 0; lp->oq->n > NETMAP_MAX_FRAGS && j < BUF_REVOKE; j++) { 600 struct netmap_slot tmp = oq_deq(lp->oq); 601 602 dropped++; 603 lp->ctr.drop++; 604 lp->ctr.drop_bytes += tmp.len; 605 606 oq_enq(freeq, &tmp); 607 } 608 609 ND(1, "revoked %d buffers from %s", j, lq->name); 610 } 611 612 return oq_deq(freeq).buf_idx; 613 } 614 615 int main(int argc, char **argv) 616 { 617 int ch; 618 uint32_t i; 619 int rv; 620 int poll_timeout = 10; /* default */ 621 622 glob_arg.ifname[0] = '\0'; 623 glob_arg.output_rings = 0; 624 glob_arg.batch = DEF_BATCH; 625 glob_arg.wait_link = DEF_WAIT_LINK; 626 glob_arg.busy_wait = false; 627 glob_arg.syslog_interval = 0; 628 glob_arg.stdout_interval = 0; 629 630 while ( (ch = getopt(argc, argv, "hi:p:b:B:s:o:w:W")) != -1) { 631 switch (ch) { 632 case 'i': 633 D("interface is %s", optarg); 634 if (strlen(optarg) > MAX_IFNAMELEN - 8) { 635 D("ifname too long %s", optarg); 636 return 1; 637 } 638 if (strncmp(optarg, "netmap:", 7) && strncmp(optarg, "vale", 4)) { 639 sprintf(glob_arg.ifname, "netmap:%s", optarg); 640 } else { 641 strcpy(glob_arg.ifname, optarg); 642 } 643 break; 644 645 case 'p': 646 if (parse_pipes(optarg)) { 647 usage(); 648 return 1; 649 } 650 break; 651 652 case 'B': 653 glob_arg.extra_bufs = atoi(optarg); 654 D("requested %d extra buffers", glob_arg.extra_bufs); 655 break; 656 657 case 'b': 658 glob_arg.batch = atoi(optarg); 659 D("batch is %d", glob_arg.batch); 660 break; 661 662 case 'w': 663 glob_arg.wait_link = atoi(optarg); 664 D("link wait for up time is %d", glob_arg.wait_link); 665 break; 666 667 case 'W': 668 glob_arg.busy_wait = true; 669 break; 670 671 case 'o': 672 glob_arg.stdout_interval = atoi(optarg); 673 break; 674 675 case 's': 676 glob_arg.syslog_interval = atoi(optarg); 677 break; 678 679 case 'h': 680 usage(); 681 return 0; 682 break; 683 684 default: 685 D("bad option %c %s", ch, optarg); 686 usage(); 687 return 1; 688 } 689 } 690 691 if (glob_arg.ifname[0] == '\0') { 692 D("missing interface name"); 693 usage(); 694 return 1; 695 } 696 697 if (glob_arg.num_groups == 0) 698 parse_pipes(""); 699 700 if (glob_arg.syslog_interval) { 701 setlogmask(LOG_UPTO(LOG_INFO)); 702 openlog("lb", LOG_CONS | LOG_PID | LOG_NDELAY, LOG_LOCAL1); 703 } 704 705 uint32_t npipes = glob_arg.output_rings; 706 707 708 pthread_t stat_thread; 709 710 ports = calloc(npipes + 1, sizeof(struct port_des)); 711 if (!ports) { 712 D("failed to allocate the stats array"); 713 return 1; 714 } 715 struct port_des *rxport = &ports[npipes]; 716 717 rxport->nmd = nmport_prepare(glob_arg.ifname); 718 if (rxport->nmd == NULL) { 719 D("cannot parse %s", glob_arg.ifname); 720 return (1); 721 } 722 /* extract the base name */ 723 strncpy(glob_arg.base_name, rxport->nmd->hdr.nr_name, MAX_IFNAMELEN); 724 725 init_groups(); 726 727 memset(&counters_buf, 0, sizeof(counters_buf)); 728 counters_buf.ctrs = calloc(npipes, sizeof(struct my_ctrs)); 729 if (!counters_buf.ctrs) { 730 D("failed to allocate the counters snapshot buffer"); 731 return 1; 732 } 733 734 rxport->nmd->reg.nr_extra_bufs = glob_arg.extra_bufs; 735 736 if (nmport_open_desc(rxport->nmd) < 0) { 737 D("cannot open %s", glob_arg.ifname); 738 return (1); 739 } 740 D("successfully opened %s", glob_arg.ifname); 741 742 uint32_t extra_bufs = rxport->nmd->reg.nr_extra_bufs; 743 struct overflow_queue *oq = NULL; 744 /* reference ring to access the buffers */ 745 rxport->ring = NETMAP_RXRING(rxport->nmd->nifp, 0); 746 747 if (!glob_arg.extra_bufs) 748 goto run; 749 750 D("obtained %d extra buffers", extra_bufs); 751 if (!extra_bufs) 752 goto run; 753 754 /* one overflow queue for each output pipe, plus one for the 755 * free extra buffers 756 */ 757 oq = calloc(npipes + 1, sizeof(struct overflow_queue)); 758 if (!oq) { 759 D("failed to allocated overflow queues descriptors"); 760 goto run; 761 } 762 763 freeq = &oq[npipes]; 764 rxport->oq = freeq; 765 766 freeq->slots = calloc(extra_bufs, sizeof(struct netmap_slot)); 767 if (!freeq->slots) { 768 D("failed to allocate the free list"); 769 } 770 freeq->size = extra_bufs; 771 snprintf(freeq->name, MAX_IFNAMELEN, "free queue"); 772 773 /* 774 * the list of buffers uses the first uint32_t in each buffer 775 * as the index of the next buffer. 776 */ 777 uint32_t scan; 778 for (scan = rxport->nmd->nifp->ni_bufs_head; 779 scan; 780 scan = *(uint32_t *)NETMAP_BUF(rxport->ring, scan)) 781 { 782 struct netmap_slot s; 783 s.len = s.flags = 0; 784 s.ptr = 0; 785 s.buf_idx = scan; 786 ND("freeq <- %d", s.buf_idx); 787 oq_enq(freeq, &s); 788 } 789 790 791 if (freeq->n != extra_bufs) { 792 D("something went wrong: netmap reported %d extra_bufs, but the free list contained %d", 793 extra_bufs, freeq->n); 794 return 1; 795 } 796 rxport->nmd->nifp->ni_bufs_head = 0; 797 798 run: 799 atexit(free_buffers); 800 801 int j, t = 0; 802 for (j = 0; j < glob_arg.num_groups; j++) { 803 struct group_des *g = &groups[j]; 804 int k; 805 for (k = 0; k < g->nports; ++k) { 806 struct port_des *p = &g->ports[k]; 807 snprintf(p->interface, MAX_PORTNAMELEN, "%s%s{%d/xT@%d", 808 (strncmp(g->pipename, "vale", 4) ? "netmap:" : ""), 809 g->pipename, g->first_id + k, 810 rxport->nmd->reg.nr_mem_id); 811 D("opening pipe named %s", p->interface); 812 813 p->nmd = nmport_open(p->interface); 814 815 if (p->nmd == NULL) { 816 D("cannot open %s", p->interface); 817 return (1); 818 } else if (p->nmd->mem != rxport->nmd->mem) { 819 D("failed to open pipe #%d in zero-copy mode, " 820 "please close any application that uses either pipe %s}%d, " 821 "or %s{%d, and retry", 822 k + 1, g->pipename, g->first_id + k, g->pipename, g->first_id + k); 823 return (1); 824 } else { 825 struct morefrag *mf; 826 827 D("successfully opened pipe #%d %s (tx slots: %d)", 828 k + 1, p->interface, p->nmd->reg.nr_tx_slots); 829 p->ring = NETMAP_TXRING(p->nmd->nifp, 0); 830 p->last_tail = nm_ring_next(p->ring, p->ring->tail); 831 mf = (struct morefrag *)p->ring->sem; 832 mf->last_flag = 0; /* unused */ 833 mf->last_hash = 0; /* unused */ 834 mf->shadow_head = p->ring->head; 835 } 836 D("zerocopy %s", 837 (rxport->nmd->mem == p->nmd->mem) ? "enabled" : "disabled"); 838 839 if (extra_bufs) { 840 struct overflow_queue *q = &oq[t + k]; 841 q->slots = calloc(extra_bufs, sizeof(struct netmap_slot)); 842 if (!q->slots) { 843 D("failed to allocate overflow queue for pipe %d", k); 844 /* make all overflow queue management fail */ 845 extra_bufs = 0; 846 } 847 q->size = extra_bufs; 848 snprintf(q->name, sizeof(q->name), "oq %s{%4d", g->pipename, k); 849 p->oq = q; 850 } 851 } 852 t += g->nports; 853 } 854 855 if (glob_arg.extra_bufs && !extra_bufs) { 856 if (oq) { 857 for (i = 0; i < npipes + 1; i++) { 858 free(oq[i].slots); 859 oq[i].slots = NULL; 860 } 861 free(oq); 862 oq = NULL; 863 } 864 D("*** overflow queues disabled ***"); 865 } 866 867 sleep(glob_arg.wait_link); 868 869 /* start stats thread after wait_link */ 870 if (pthread_create(&stat_thread, NULL, print_stats, NULL) == -1) { 871 D("unable to create the stats thread: %s", strerror(errno)); 872 return 1; 873 } 874 875 struct pollfd pollfd[npipes + 1]; 876 memset(&pollfd, 0, sizeof(pollfd)); 877 signal(SIGINT, sigint_h); 878 879 /* make sure we wake up as often as needed, even when there are no 880 * packets coming in 881 */ 882 if (glob_arg.syslog_interval > 0 && glob_arg.syslog_interval < poll_timeout) 883 poll_timeout = glob_arg.syslog_interval; 884 if (glob_arg.stdout_interval > 0 && glob_arg.stdout_interval < poll_timeout) 885 poll_timeout = glob_arg.stdout_interval; 886 887 /* initialize the morefrag structures for the input rings */ 888 for (i = rxport->nmd->first_rx_ring; i <= rxport->nmd->last_rx_ring; i++) { 889 struct netmap_ring *rxring = NETMAP_RXRING(rxport->nmd->nifp, i); 890 struct morefrag *mf = (struct morefrag *)rxring->sem; 891 892 mf->last_flag = 0; 893 mf->last_hash = 0; 894 mf->shadow_head = 0; /* unused */ 895 } 896 897 while (!do_abort) { 898 u_int polli = 0; 899 900 for (i = 0; i < npipes; ++i) { 901 struct netmap_ring *ring = ports[i].ring; 902 int pending = nm_tx_pending(ring); 903 904 /* if there are packets pending, we want to be notified when 905 * tail moves, so we let cur=tail 906 */ 907 ring->cur = pending ? ring->tail : ring->head; 908 909 if (!glob_arg.busy_wait && !pending) { 910 /* no need to poll, there are no packets pending */ 911 continue; 912 } 913 pollfd[polli].fd = ports[i].nmd->fd; 914 pollfd[polli].events = POLLOUT; 915 pollfd[polli].revents = 0; 916 ++polli; 917 } 918 919 pollfd[polli].fd = rxport->nmd->fd; 920 pollfd[polli].events = POLLIN; 921 pollfd[polli].revents = 0; 922 ++polli; 923 924 ND(5, "polling %d file descriptors", polli); 925 rv = poll(pollfd, polli, poll_timeout); 926 if (rv <= 0) { 927 if (rv < 0 && errno != EAGAIN && errno != EINTR) 928 RD(1, "poll error %s", strerror(errno)); 929 goto send_stats; 930 } 931 932 /* if there are several groups, try pushing released packets from 933 * upstream groups to the downstream ones. 934 * 935 * It is important to do this before returned slots are reused 936 * for new transmissions. For the same reason, this must be 937 * done starting from the last group going backwards. 938 */ 939 for (i = glob_arg.num_groups - 1U; i > 0; i--) { 940 struct group_des *g = &groups[i - 1]; 941 942 for (j = 0; j < g->nports; j++) { 943 struct port_des *p = &g->ports[j]; 944 struct netmap_ring *ring = p->ring; 945 uint32_t last = p->last_tail, 946 stop = nm_ring_next(ring, ring->tail); 947 948 /* slight abuse of the API here: we touch the slot 949 * pointed to by tail 950 */ 951 for ( ; last != stop; last = nm_ring_next(ring, last)) { 952 struct netmap_slot *rs = &ring->slot[last]; 953 // XXX less aggressive? 954 rs->buf_idx = forward_packet(g + 1, rs); 955 rs->flags = NS_BUF_CHANGED; 956 rs->ptr = 0; 957 } 958 p->last_tail = last; 959 } 960 } 961 962 963 964 if (oq) { 965 /* try to push packets from the overflow queues 966 * to the corresponding pipes 967 */ 968 for (i = 0; i < npipes; i++) { 969 struct port_des *p = &ports[i]; 970 struct overflow_queue *q = p->oq; 971 uint32_t k; 972 int64_t lim; 973 struct netmap_ring *ring; 974 struct netmap_slot *slot; 975 struct morefrag *mf; 976 977 if (oq_empty(q)) 978 continue; 979 ring = p->ring; 980 mf = (struct morefrag *)ring->sem; 981 lim = ring->tail - mf->shadow_head; 982 if (!lim) 983 continue; 984 if (lim < 0) 985 lim += ring->num_slots; 986 if (q->n < lim) 987 lim = q->n; 988 for (k = 0; k < lim; k++) { 989 struct netmap_slot s = oq_deq(q), tmp; 990 tmp.ptr = 0; 991 slot = &ring->slot[mf->shadow_head]; 992 tmp.buf_idx = slot->buf_idx; 993 oq_enq(freeq, &tmp); 994 *slot = s; 995 slot->flags |= NS_BUF_CHANGED; 996 mf->shadow_head = nm_ring_next(ring, mf->shadow_head); 997 if (!(slot->flags & NS_MOREFRAG)) 998 ring->head = mf->shadow_head; 999 } 1000 } 1001 } 1002 1003 /* push any new packets from the input port to the first group */ 1004 int batch = 0; 1005 for (i = rxport->nmd->first_rx_ring; i <= rxport->nmd->last_rx_ring; i++) { 1006 struct netmap_ring *rxring = NETMAP_RXRING(rxport->nmd->nifp, i); 1007 struct morefrag *mf = (struct morefrag *)rxring->sem; 1008 1009 //D("prepare to scan rings"); 1010 int next_head = rxring->head; 1011 struct netmap_slot *next_slot = &rxring->slot[next_head]; 1012 const char *next_buf = NETMAP_BUF(rxring, next_slot->buf_idx); 1013 while (!nm_ring_empty(rxring)) { 1014 struct netmap_slot *rs = next_slot; 1015 struct group_des *g = &groups[0]; 1016 ++received_pkts; 1017 received_bytes += rs->len; 1018 1019 // CHOOSE THE CORRECT OUTPUT PIPE 1020 // If the previous slot had NS_MOREFRAG set, this is another 1021 // fragment of the last packet and it should go to the same 1022 // output pipe as before. 1023 if (!mf->last_flag) { 1024 // 'B' is just a hashing seed 1025 mf->last_hash = pkt_hdr_hash((const unsigned char *)next_buf, 4, 'B'); 1026 } 1027 mf->last_flag = rs->flags & NS_MOREFRAG; 1028 rs->ptr = mf->last_hash; 1029 if (rs->ptr == 0) { 1030 non_ip++; // XXX ?? 1031 } 1032 // prefetch the buffer for the next round 1033 next_head = nm_ring_next(rxring, next_head); 1034 next_slot = &rxring->slot[next_head]; 1035 next_buf = NETMAP_BUF(rxring, next_slot->buf_idx); 1036 __builtin_prefetch(next_buf); 1037 rs->buf_idx = forward_packet(g, rs); 1038 rs->flags = NS_BUF_CHANGED; 1039 rxring->head = rxring->cur = next_head; 1040 1041 batch++; 1042 if (unlikely(batch >= glob_arg.batch)) { 1043 ioctl(rxport->nmd->fd, NIOCRXSYNC, NULL); 1044 batch = 0; 1045 } 1046 ND(1, 1047 "Forwarded Packets: %"PRIu64" Dropped packets: %"PRIu64" Percent: %.2f", 1048 forwarded, dropped, 1049 ((float)dropped / (float)forwarded * 100)); 1050 } 1051 1052 } 1053 1054 send_stats: 1055 if (counters_buf.status == COUNTERS_FULL) 1056 continue; 1057 /* take a new snapshot of the counters */ 1058 gettimeofday(&counters_buf.ts, NULL); 1059 for (i = 0; i < npipes; i++) { 1060 struct my_ctrs *c = &counters_buf.ctrs[i]; 1061 *c = ports[i].ctr; 1062 /* 1063 * If there are overflow queues, copy the number of them for each 1064 * port to the ctrs.oq_n variable for each port. 1065 */ 1066 if (ports[i].oq != NULL) 1067 c->oq_n = ports[i].oq->n; 1068 } 1069 counters_buf.received_pkts = received_pkts; 1070 counters_buf.received_bytes = received_bytes; 1071 counters_buf.non_ip = non_ip; 1072 if (freeq != NULL) 1073 counters_buf.freeq_n = freeq->n; 1074 __sync_synchronize(); 1075 counters_buf.status = COUNTERS_FULL; 1076 } 1077 1078 /* 1079 * If freeq exists, copy the number to the freeq_n member of the 1080 * message struct, otherwise set it to 0. 1081 */ 1082 if (freeq != NULL) { 1083 freeq_n = freeq->n; 1084 } else { 1085 freeq_n = 0; 1086 } 1087 1088 pthread_join(stat_thread, NULL); 1089 1090 printf("%"PRIu64" packets forwarded. %"PRIu64" packets dropped. Total %"PRIu64"\n", forwarded, 1091 dropped, forwarded + dropped); 1092 return 0; 1093 } 1094