1 /* 2 * Copyright (C) 2017 Corelight, Inc. and Universita` di Pisa. All rights reserved. 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions 6 * are met: 7 * 1. Redistributions of source code must retain the above copyright 8 * notice, this list of conditions and the following disclaimer. 9 * 2. Redistributions in binary form must reproduce the above copyright 10 * notice, this list of conditions and the following disclaimer in the 11 * documentation and/or other materials provided with the distribution. 12 * 13 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 14 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 15 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 16 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 17 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 18 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 19 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 20 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 21 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 22 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 23 * SUCH DAMAGE. 24 */ 25 /* $FreeBSD$ */ 26 #include <stdio.h> 27 #include <string.h> 28 #include <ctype.h> 29 #include <stdbool.h> 30 #include <inttypes.h> 31 #include <syslog.h> 32 33 #define NETMAP_WITH_LIBS 34 #include <net/netmap_user.h> 35 #include <sys/poll.h> 36 37 #include <netinet/in.h> /* htonl */ 38 39 #include <pthread.h> 40 41 #include "pkt_hash.h" 42 #include "ctrs.h" 43 44 45 /* 46 * use our version of header structs, rather than bringing in a ton 47 * of platform specific ones 48 */ 49 #ifndef ETH_ALEN 50 #define ETH_ALEN 6 51 #endif 52 53 struct compact_eth_hdr { 54 unsigned char h_dest[ETH_ALEN]; 55 unsigned char h_source[ETH_ALEN]; 56 u_int16_t h_proto; 57 }; 58 59 struct compact_ip_hdr { 60 u_int8_t ihl:4, version:4; 61 u_int8_t tos; 62 u_int16_t tot_len; 63 u_int16_t id; 64 u_int16_t frag_off; 65 u_int8_t ttl; 66 u_int8_t protocol; 67 u_int16_t check; 68 u_int32_t saddr; 69 u_int32_t daddr; 70 }; 71 72 struct compact_ipv6_hdr { 73 u_int8_t priority:4, version:4; 74 u_int8_t flow_lbl[3]; 75 u_int16_t payload_len; 76 u_int8_t nexthdr; 77 u_int8_t hop_limit; 78 struct in6_addr saddr; 79 struct in6_addr daddr; 80 }; 81 82 #define MAX_IFNAMELEN 64 83 #define MAX_PORTNAMELEN (MAX_IFNAMELEN + 40) 84 #define DEF_OUT_PIPES 2 85 #define DEF_EXTRA_BUFS 0 86 #define DEF_BATCH 2048 87 #define DEF_WAIT_LINK 2 88 #define DEF_STATS_INT 600 89 #define BUF_REVOKE 100 90 #define STAT_MSG_MAXSIZE 1024 91 92 struct { 93 char ifname[MAX_IFNAMELEN]; 94 char base_name[MAX_IFNAMELEN]; 95 int netmap_fd; 96 uint16_t output_rings; 97 uint16_t num_groups; 98 uint32_t extra_bufs; 99 uint16_t batch; 100 int stdout_interval; 101 int syslog_interval; 102 int wait_link; 103 bool busy_wait; 104 } glob_arg; 105 106 /* 107 * the overflow queue is a circular queue of buffers 108 */ 109 struct overflow_queue { 110 char name[MAX_IFNAMELEN + 16]; 111 struct netmap_slot *slots; 112 uint32_t head; 113 uint32_t tail; 114 uint32_t n; 115 uint32_t size; 116 }; 117 118 struct overflow_queue *freeq; 119 120 static inline int 121 oq_full(struct overflow_queue *q) 122 { 123 return q->n >= q->size; 124 } 125 126 static inline int 127 oq_empty(struct overflow_queue *q) 128 { 129 return q->n <= 0; 130 } 131 132 static inline void 133 oq_enq(struct overflow_queue *q, const struct netmap_slot *s) 134 { 135 if (unlikely(oq_full(q))) { 136 D("%s: queue full!", q->name); 137 abort(); 138 } 139 q->slots[q->tail] = *s; 140 q->n++; 141 q->tail++; 142 if (q->tail >= q->size) 143 q->tail = 0; 144 } 145 146 static inline struct netmap_slot 147 oq_deq(struct overflow_queue *q) 148 { 149 struct netmap_slot s = q->slots[q->head]; 150 if (unlikely(oq_empty(q))) { 151 D("%s: queue empty!", q->name); 152 abort(); 153 } 154 q->n--; 155 q->head++; 156 if (q->head >= q->size) 157 q->head = 0; 158 return s; 159 } 160 161 static volatile int do_abort = 0; 162 163 uint64_t dropped = 0; 164 uint64_t forwarded = 0; 165 uint64_t received_bytes = 0; 166 uint64_t received_pkts = 0; 167 uint64_t non_ip = 0; 168 uint32_t freeq_n = 0; 169 170 struct port_des { 171 char interface[MAX_PORTNAMELEN]; 172 struct my_ctrs ctr; 173 unsigned int last_sync; 174 uint32_t last_tail; 175 struct overflow_queue *oq; 176 struct nm_desc *nmd; 177 struct netmap_ring *ring; 178 struct group_des *group; 179 }; 180 181 struct port_des *ports; 182 183 /* each group of pipes receives all the packets */ 184 struct group_des { 185 char pipename[MAX_IFNAMELEN]; 186 struct port_des *ports; 187 int first_id; 188 int nports; 189 int last; 190 int custom_port; 191 }; 192 193 struct group_des *groups; 194 195 /* statistcs */ 196 struct counters { 197 struct timeval ts; 198 struct my_ctrs *ctrs; 199 uint64_t received_pkts; 200 uint64_t received_bytes; 201 uint64_t non_ip; 202 uint32_t freeq_n; 203 int status __attribute__((aligned(64))); 204 #define COUNTERS_EMPTY 0 205 #define COUNTERS_FULL 1 206 }; 207 208 struct counters counters_buf; 209 210 static void * 211 print_stats(void *arg) 212 { 213 int npipes = glob_arg.output_rings; 214 int sys_int = 0; 215 (void)arg; 216 struct my_ctrs cur, prev; 217 struct my_ctrs *pipe_prev; 218 219 pipe_prev = calloc(npipes, sizeof(struct my_ctrs)); 220 if (pipe_prev == NULL) { 221 D("out of memory"); 222 exit(1); 223 } 224 225 char stat_msg[STAT_MSG_MAXSIZE] = ""; 226 227 memset(&prev, 0, sizeof(prev)); 228 while (!do_abort) { 229 int j, dosyslog = 0, dostdout = 0, newdata; 230 uint64_t pps = 0, dps = 0, bps = 0, dbps = 0, usec = 0; 231 struct my_ctrs x; 232 233 counters_buf.status = COUNTERS_EMPTY; 234 newdata = 0; 235 memset(&cur, 0, sizeof(cur)); 236 sleep(1); 237 if (counters_buf.status == COUNTERS_FULL) { 238 __sync_synchronize(); 239 newdata = 1; 240 cur.t = counters_buf.ts; 241 if (prev.t.tv_sec || prev.t.tv_usec) { 242 usec = (cur.t.tv_sec - prev.t.tv_sec) * 1000000 + 243 cur.t.tv_usec - prev.t.tv_usec; 244 } 245 } 246 247 ++sys_int; 248 if (glob_arg.stdout_interval && sys_int % glob_arg.stdout_interval == 0) 249 dostdout = 1; 250 if (glob_arg.syslog_interval && sys_int % glob_arg.syslog_interval == 0) 251 dosyslog = 1; 252 253 for (j = 0; j < npipes; ++j) { 254 struct my_ctrs *c = &counters_buf.ctrs[j]; 255 cur.pkts += c->pkts; 256 cur.drop += c->drop; 257 cur.drop_bytes += c->drop_bytes; 258 cur.bytes += c->bytes; 259 260 if (usec) { 261 x.pkts = c->pkts - pipe_prev[j].pkts; 262 x.drop = c->drop - pipe_prev[j].drop; 263 x.bytes = c->bytes - pipe_prev[j].bytes; 264 x.drop_bytes = c->drop_bytes - pipe_prev[j].drop_bytes; 265 pps = (x.pkts*1000000 + usec/2) / usec; 266 dps = (x.drop*1000000 + usec/2) / usec; 267 bps = ((x.bytes*1000000 + usec/2) / usec) * 8; 268 dbps = ((x.drop_bytes*1000000 + usec/2) / usec) * 8; 269 } 270 pipe_prev[j] = *c; 271 272 if ( (dosyslog || dostdout) && newdata ) 273 snprintf(stat_msg, STAT_MSG_MAXSIZE, 274 "{" 275 "\"ts\":%.6f," 276 "\"interface\":\"%s\"," 277 "\"output_ring\":%" PRIu16 "," 278 "\"packets_forwarded\":%" PRIu64 "," 279 "\"packets_dropped\":%" PRIu64 "," 280 "\"data_forward_rate_Mbps\":%.4f," 281 "\"data_drop_rate_Mbps\":%.4f," 282 "\"packet_forward_rate_kpps\":%.4f," 283 "\"packet_drop_rate_kpps\":%.4f," 284 "\"overflow_queue_size\":%" PRIu32 285 "}", cur.t.tv_sec + (cur.t.tv_usec / 1000000.0), 286 ports[j].interface, 287 j, 288 c->pkts, 289 c->drop, 290 (double)bps / 1024 / 1024, 291 (double)dbps / 1024 / 1024, 292 (double)pps / 1000, 293 (double)dps / 1000, 294 c->oq_n); 295 296 if (dosyslog && stat_msg[0]) 297 syslog(LOG_INFO, "%s", stat_msg); 298 if (dostdout && stat_msg[0]) 299 printf("%s\n", stat_msg); 300 } 301 if (usec) { 302 x.pkts = cur.pkts - prev.pkts; 303 x.drop = cur.drop - prev.drop; 304 x.bytes = cur.bytes - prev.bytes; 305 x.drop_bytes = cur.drop_bytes - prev.drop_bytes; 306 pps = (x.pkts*1000000 + usec/2) / usec; 307 dps = (x.drop*1000000 + usec/2) / usec; 308 bps = ((x.bytes*1000000 + usec/2) / usec) * 8; 309 dbps = ((x.drop_bytes*1000000 + usec/2) / usec) * 8; 310 } 311 312 if ( (dosyslog || dostdout) && newdata ) 313 snprintf(stat_msg, STAT_MSG_MAXSIZE, 314 "{" 315 "\"ts\":%.6f," 316 "\"interface\":\"%s\"," 317 "\"output_ring\":null," 318 "\"packets_received\":%" PRIu64 "," 319 "\"packets_forwarded\":%" PRIu64 "," 320 "\"packets_dropped\":%" PRIu64 "," 321 "\"non_ip_packets\":%" PRIu64 "," 322 "\"data_forward_rate_Mbps\":%.4f," 323 "\"data_drop_rate_Mbps\":%.4f," 324 "\"packet_forward_rate_kpps\":%.4f," 325 "\"packet_drop_rate_kpps\":%.4f," 326 "\"free_buffer_slots\":%" PRIu32 327 "}", cur.t.tv_sec + (cur.t.tv_usec / 1000000.0), 328 glob_arg.ifname, 329 received_pkts, 330 cur.pkts, 331 cur.drop, 332 counters_buf.non_ip, 333 (double)bps / 1024 / 1024, 334 (double)dbps / 1024 / 1024, 335 (double)pps / 1000, 336 (double)dps / 1000, 337 counters_buf.freeq_n); 338 339 if (dosyslog && stat_msg[0]) 340 syslog(LOG_INFO, "%s", stat_msg); 341 if (dostdout && stat_msg[0]) 342 printf("%s\n", stat_msg); 343 344 prev = cur; 345 } 346 347 free(pipe_prev); 348 349 return NULL; 350 } 351 352 static void 353 free_buffers(void) 354 { 355 int i, tot = 0; 356 struct port_des *rxport = &ports[glob_arg.output_rings]; 357 358 /* build a netmap free list with the buffers in all the overflow queues */ 359 for (i = 0; i < glob_arg.output_rings + 1; i++) { 360 struct port_des *cp = &ports[i]; 361 struct overflow_queue *q = cp->oq; 362 363 if (!q) 364 continue; 365 366 while (q->n) { 367 struct netmap_slot s = oq_deq(q); 368 uint32_t *b = (uint32_t *)NETMAP_BUF(cp->ring, s.buf_idx); 369 370 *b = rxport->nmd->nifp->ni_bufs_head; 371 rxport->nmd->nifp->ni_bufs_head = s.buf_idx; 372 tot++; 373 } 374 } 375 D("added %d buffers to netmap free list", tot); 376 377 for (i = 0; i < glob_arg.output_rings + 1; ++i) { 378 nm_close(ports[i].nmd); 379 } 380 } 381 382 383 static void sigint_h(int sig) 384 { 385 (void)sig; /* UNUSED */ 386 do_abort = 1; 387 signal(SIGINT, SIG_DFL); 388 } 389 390 void usage() 391 { 392 printf("usage: lb [options]\n"); 393 printf("where options are:\n"); 394 printf(" -h view help text\n"); 395 printf(" -i iface interface name (required)\n"); 396 printf(" -p [prefix:]npipes add a new group of output pipes\n"); 397 printf(" -B nbufs number of extra buffers (default: %d)\n", DEF_EXTRA_BUFS); 398 printf(" -b batch batch size (default: %d)\n", DEF_BATCH); 399 printf(" -w seconds wait for link up (default: %d)\n", DEF_WAIT_LINK); 400 printf(" -W enable busy waiting. this will run your CPU at 100%%\n"); 401 printf(" -s seconds seconds between syslog stats messages (default: 0)\n"); 402 printf(" -o seconds seconds between stdout stats messages (default: 0)\n"); 403 exit(0); 404 } 405 406 static int 407 parse_pipes(char *spec) 408 { 409 char *end = index(spec, ':'); 410 static int max_groups = 0; 411 struct group_des *g; 412 413 ND("spec %s num_groups %d", spec, glob_arg.num_groups); 414 if (max_groups < glob_arg.num_groups + 1) { 415 size_t size = sizeof(*g) * (glob_arg.num_groups + 1); 416 groups = realloc(groups, size); 417 if (groups == NULL) { 418 D("out of memory"); 419 return 1; 420 } 421 } 422 g = &groups[glob_arg.num_groups]; 423 memset(g, 0, sizeof(*g)); 424 425 if (end != NULL) { 426 if (end - spec > MAX_IFNAMELEN - 8) { 427 D("name '%s' too long", spec); 428 return 1; 429 } 430 if (end == spec) { 431 D("missing prefix before ':' in '%s'", spec); 432 return 1; 433 } 434 strncpy(g->pipename, spec, end - spec); 435 g->custom_port = 1; 436 end++; 437 } else { 438 /* no prefix, this group will use the 439 * name of the input port. 440 * This will be set in init_groups(), 441 * since here the input port may still 442 * be uninitialized 443 */ 444 end = spec; 445 } 446 if (*end == '\0') { 447 g->nports = DEF_OUT_PIPES; 448 } else { 449 g->nports = atoi(end); 450 if (g->nports < 1) { 451 D("invalid number of pipes '%s' (must be at least 1)", end); 452 return 1; 453 } 454 } 455 glob_arg.output_rings += g->nports; 456 glob_arg.num_groups++; 457 return 0; 458 } 459 460 /* complete the initialization of the groups data structure */ 461 void init_groups(void) 462 { 463 int i, j, t = 0; 464 struct group_des *g = NULL; 465 for (i = 0; i < glob_arg.num_groups; i++) { 466 g = &groups[i]; 467 g->ports = &ports[t]; 468 for (j = 0; j < g->nports; j++) 469 g->ports[j].group = g; 470 t += g->nports; 471 if (!g->custom_port) 472 strcpy(g->pipename, glob_arg.base_name); 473 for (j = 0; j < i; j++) { 474 struct group_des *h = &groups[j]; 475 if (!strcmp(h->pipename, g->pipename)) 476 g->first_id += h->nports; 477 } 478 } 479 g->last = 1; 480 } 481 482 /* push the packet described by slot rs to the group g. 483 * This may cause other buffers to be pushed down the 484 * chain headed by g. 485 * Return a free buffer. 486 */ 487 uint32_t forward_packet(struct group_des *g, struct netmap_slot *rs) 488 { 489 uint32_t hash = rs->ptr; 490 uint32_t output_port = hash % g->nports; 491 struct port_des *port = &g->ports[output_port]; 492 struct netmap_ring *ring = port->ring; 493 struct overflow_queue *q = port->oq; 494 495 /* Move the packet to the output pipe, unless there is 496 * either no space left on the ring, or there is some 497 * packet still in the overflow queue (since those must 498 * take precedence over the new one) 499 */ 500 if (ring->head != ring->tail && (q == NULL || oq_empty(q))) { 501 struct netmap_slot *ts = &ring->slot[ring->head]; 502 struct netmap_slot old_slot = *ts; 503 504 ts->buf_idx = rs->buf_idx; 505 ts->len = rs->len; 506 ts->flags |= NS_BUF_CHANGED; 507 ts->ptr = rs->ptr; 508 ring->head = nm_ring_next(ring, ring->head); 509 port->ctr.bytes += rs->len; 510 port->ctr.pkts++; 511 forwarded++; 512 return old_slot.buf_idx; 513 } 514 515 /* use the overflow queue, if available */ 516 if (q == NULL || oq_full(q)) { 517 /* no space left on the ring and no overflow queue 518 * available: we are forced to drop the packet 519 */ 520 dropped++; 521 port->ctr.drop++; 522 port->ctr.drop_bytes += rs->len; 523 return rs->buf_idx; 524 } 525 526 oq_enq(q, rs); 527 528 /* 529 * we cannot continue down the chain and we need to 530 * return a free buffer now. We take it from the free queue. 531 */ 532 if (oq_empty(freeq)) { 533 /* the free queue is empty. Revoke some buffers 534 * from the longest overflow queue 535 */ 536 uint32_t j; 537 struct port_des *lp = &ports[0]; 538 uint32_t max = lp->oq->n; 539 540 /* let lp point to the port with the longest queue */ 541 for (j = 1; j < glob_arg.output_rings; j++) { 542 struct port_des *cp = &ports[j]; 543 if (cp->oq->n > max) { 544 lp = cp; 545 max = cp->oq->n; 546 } 547 } 548 549 /* move the oldest BUF_REVOKE buffers from the 550 * lp queue to the free queue 551 */ 552 // XXX optimize this cycle 553 for (j = 0; lp->oq->n && j < BUF_REVOKE; j++) { 554 struct netmap_slot tmp = oq_deq(lp->oq); 555 556 dropped++; 557 lp->ctr.drop++; 558 lp->ctr.drop_bytes += tmp.len; 559 560 oq_enq(freeq, &tmp); 561 } 562 563 ND(1, "revoked %d buffers from %s", j, lq->name); 564 } 565 566 return oq_deq(freeq).buf_idx; 567 } 568 569 int main(int argc, char **argv) 570 { 571 int ch; 572 uint32_t i; 573 int rv; 574 unsigned int iter = 0; 575 int poll_timeout = 10; /* default */ 576 577 glob_arg.ifname[0] = '\0'; 578 glob_arg.output_rings = 0; 579 glob_arg.batch = DEF_BATCH; 580 glob_arg.wait_link = DEF_WAIT_LINK; 581 glob_arg.busy_wait = false; 582 glob_arg.syslog_interval = 0; 583 glob_arg.stdout_interval = 0; 584 585 while ( (ch = getopt(argc, argv, "hi:p:b:B:s:o:w:W")) != -1) { 586 switch (ch) { 587 case 'i': 588 D("interface is %s", optarg); 589 if (strlen(optarg) > MAX_IFNAMELEN - 8) { 590 D("ifname too long %s", optarg); 591 return 1; 592 } 593 if (strncmp(optarg, "netmap:", 7) && strncmp(optarg, "vale", 4)) { 594 sprintf(glob_arg.ifname, "netmap:%s", optarg); 595 } else { 596 strcpy(glob_arg.ifname, optarg); 597 } 598 break; 599 600 case 'p': 601 if (parse_pipes(optarg)) { 602 usage(); 603 return 1; 604 } 605 break; 606 607 case 'B': 608 glob_arg.extra_bufs = atoi(optarg); 609 D("requested %d extra buffers", glob_arg.extra_bufs); 610 break; 611 612 case 'b': 613 glob_arg.batch = atoi(optarg); 614 D("batch is %d", glob_arg.batch); 615 break; 616 617 case 'w': 618 glob_arg.wait_link = atoi(optarg); 619 D("link wait for up time is %d", glob_arg.wait_link); 620 break; 621 622 case 'W': 623 glob_arg.busy_wait = true; 624 break; 625 626 case 'o': 627 glob_arg.stdout_interval = atoi(optarg); 628 break; 629 630 case 's': 631 glob_arg.syslog_interval = atoi(optarg); 632 break; 633 634 case 'h': 635 usage(); 636 return 0; 637 break; 638 639 default: 640 D("bad option %c %s", ch, optarg); 641 usage(); 642 return 1; 643 } 644 } 645 646 if (glob_arg.ifname[0] == '\0') { 647 D("missing interface name"); 648 usage(); 649 return 1; 650 } 651 652 /* extract the base name */ 653 char *nscan = strncmp(glob_arg.ifname, "netmap:", 7) ? 654 glob_arg.ifname : glob_arg.ifname + 7; 655 strncpy(glob_arg.base_name, nscan, MAX_IFNAMELEN - 1); 656 for (nscan = glob_arg.base_name; *nscan && !index("-*^{}/@", *nscan); nscan++) 657 ; 658 *nscan = '\0'; 659 660 if (glob_arg.num_groups == 0) 661 parse_pipes(""); 662 663 if (glob_arg.syslog_interval) { 664 setlogmask(LOG_UPTO(LOG_INFO)); 665 openlog("lb", LOG_CONS | LOG_PID | LOG_NDELAY, LOG_LOCAL1); 666 } 667 668 uint32_t npipes = glob_arg.output_rings; 669 670 671 pthread_t stat_thread; 672 673 ports = calloc(npipes + 1, sizeof(struct port_des)); 674 if (!ports) { 675 D("failed to allocate the stats array"); 676 return 1; 677 } 678 struct port_des *rxport = &ports[npipes]; 679 init_groups(); 680 681 memset(&counters_buf, 0, sizeof(counters_buf)); 682 counters_buf.ctrs = calloc(npipes, sizeof(struct my_ctrs)); 683 if (!counters_buf.ctrs) { 684 D("failed to allocate the counters snapshot buffer"); 685 return 1; 686 } 687 688 /* we need base_req to specify pipes and extra bufs */ 689 struct nmreq base_req; 690 memset(&base_req, 0, sizeof(base_req)); 691 692 base_req.nr_arg1 = npipes; 693 base_req.nr_arg3 = glob_arg.extra_bufs; 694 695 rxport->nmd = nm_open(glob_arg.ifname, &base_req, 0, NULL); 696 697 if (rxport->nmd == NULL) { 698 D("cannot open %s", glob_arg.ifname); 699 return (1); 700 } else { 701 D("successfully opened %s (tx rings: %u)", glob_arg.ifname, 702 rxport->nmd->req.nr_tx_slots); 703 } 704 705 uint32_t extra_bufs = rxport->nmd->req.nr_arg3; 706 struct overflow_queue *oq = NULL; 707 /* reference ring to access the buffers */ 708 rxport->ring = NETMAP_RXRING(rxport->nmd->nifp, 0); 709 710 if (!glob_arg.extra_bufs) 711 goto run; 712 713 D("obtained %d extra buffers", extra_bufs); 714 if (!extra_bufs) 715 goto run; 716 717 /* one overflow queue for each output pipe, plus one for the 718 * free extra buffers 719 */ 720 oq = calloc(npipes + 1, sizeof(struct overflow_queue)); 721 if (!oq) { 722 D("failed to allocated overflow queues descriptors"); 723 goto run; 724 } 725 726 freeq = &oq[npipes]; 727 rxport->oq = freeq; 728 729 freeq->slots = calloc(extra_bufs, sizeof(struct netmap_slot)); 730 if (!freeq->slots) { 731 D("failed to allocate the free list"); 732 } 733 freeq->size = extra_bufs; 734 snprintf(freeq->name, MAX_IFNAMELEN, "free queue"); 735 736 /* 737 * the list of buffers uses the first uint32_t in each buffer 738 * as the index of the next buffer. 739 */ 740 uint32_t scan; 741 for (scan = rxport->nmd->nifp->ni_bufs_head; 742 scan; 743 scan = *(uint32_t *)NETMAP_BUF(rxport->ring, scan)) 744 { 745 struct netmap_slot s; 746 s.len = s.flags = 0; 747 s.ptr = 0; 748 s.buf_idx = scan; 749 ND("freeq <- %d", s.buf_idx); 750 oq_enq(freeq, &s); 751 } 752 753 754 if (freeq->n != extra_bufs) { 755 D("something went wrong: netmap reported %d extra_bufs, but the free list contained %d", 756 extra_bufs, freeq->n); 757 return 1; 758 } 759 rxport->nmd->nifp->ni_bufs_head = 0; 760 761 run: 762 atexit(free_buffers); 763 764 int j, t = 0; 765 for (j = 0; j < glob_arg.num_groups; j++) { 766 struct group_des *g = &groups[j]; 767 int k; 768 for (k = 0; k < g->nports; ++k) { 769 struct port_des *p = &g->ports[k]; 770 snprintf(p->interface, MAX_PORTNAMELEN, "%s%s{%d/xT@%d", 771 (strncmp(g->pipename, "vale", 4) ? "netmap:" : ""), 772 g->pipename, g->first_id + k, 773 rxport->nmd->req.nr_arg2); 774 D("opening pipe named %s", p->interface); 775 776 p->nmd = nm_open(p->interface, NULL, 0, rxport->nmd); 777 778 if (p->nmd == NULL) { 779 D("cannot open %s", p->interface); 780 return (1); 781 } else if (p->nmd->req.nr_arg2 != rxport->nmd->req.nr_arg2) { 782 D("failed to open pipe #%d in zero-copy mode, " 783 "please close any application that uses either pipe %s}%d, " 784 "or %s{%d, and retry", 785 k + 1, g->pipename, g->first_id + k, g->pipename, g->first_id + k); 786 return (1); 787 } else { 788 D("successfully opened pipe #%d %s (tx slots: %d)", 789 k + 1, p->interface, p->nmd->req.nr_tx_slots); 790 p->ring = NETMAP_TXRING(p->nmd->nifp, 0); 791 p->last_tail = nm_ring_next(p->ring, p->ring->tail); 792 } 793 D("zerocopy %s", 794 (rxport->nmd->mem == p->nmd->mem) ? "enabled" : "disabled"); 795 796 if (extra_bufs) { 797 struct overflow_queue *q = &oq[t + k]; 798 q->slots = calloc(extra_bufs, sizeof(struct netmap_slot)); 799 if (!q->slots) { 800 D("failed to allocate overflow queue for pipe %d", k); 801 /* make all overflow queue management fail */ 802 extra_bufs = 0; 803 } 804 q->size = extra_bufs; 805 snprintf(q->name, sizeof(q->name), "oq %s{%4d", g->pipename, k); 806 p->oq = q; 807 } 808 } 809 t += g->nports; 810 } 811 812 if (glob_arg.extra_bufs && !extra_bufs) { 813 if (oq) { 814 for (i = 0; i < npipes + 1; i++) { 815 free(oq[i].slots); 816 oq[i].slots = NULL; 817 } 818 free(oq); 819 oq = NULL; 820 } 821 D("*** overflow queues disabled ***"); 822 } 823 824 sleep(glob_arg.wait_link); 825 826 /* start stats thread after wait_link */ 827 if (pthread_create(&stat_thread, NULL, print_stats, NULL) == -1) { 828 D("unable to create the stats thread: %s", strerror(errno)); 829 return 1; 830 } 831 832 struct pollfd pollfd[npipes + 1]; 833 memset(&pollfd, 0, sizeof(pollfd)); 834 signal(SIGINT, sigint_h); 835 836 /* make sure we wake up as often as needed, even when there are no 837 * packets coming in 838 */ 839 if (glob_arg.syslog_interval > 0 && glob_arg.syslog_interval < poll_timeout) 840 poll_timeout = glob_arg.syslog_interval; 841 if (glob_arg.stdout_interval > 0 && glob_arg.stdout_interval < poll_timeout) 842 poll_timeout = glob_arg.stdout_interval; 843 844 while (!do_abort) { 845 u_int polli = 0; 846 iter++; 847 848 for (i = 0; i < npipes; ++i) { 849 struct netmap_ring *ring = ports[i].ring; 850 int pending = nm_tx_pending(ring); 851 852 /* if there are packets pending, we want to be notified when 853 * tail moves, so we let cur=tail 854 */ 855 ring->cur = pending ? ring->tail : ring->head; 856 857 if (!glob_arg.busy_wait && !pending) { 858 /* no need to poll, there are no packets pending */ 859 continue; 860 } 861 pollfd[polli].fd = ports[i].nmd->fd; 862 pollfd[polli].events = POLLOUT; 863 pollfd[polli].revents = 0; 864 ++polli; 865 } 866 867 pollfd[polli].fd = rxport->nmd->fd; 868 pollfd[polli].events = POLLIN; 869 pollfd[polli].revents = 0; 870 ++polli; 871 872 //RD(5, "polling %d file descriptors", polli+1); 873 rv = poll(pollfd, polli, poll_timeout); 874 if (rv <= 0) { 875 if (rv < 0 && errno != EAGAIN && errno != EINTR) 876 RD(1, "poll error %s", strerror(errno)); 877 goto send_stats; 878 } 879 880 /* if there are several groups, try pushing released packets from 881 * upstream groups to the downstream ones. 882 * 883 * It is important to do this before returned slots are reused 884 * for new transmissions. For the same reason, this must be 885 * done starting from the last group going backwards. 886 */ 887 for (i = glob_arg.num_groups - 1U; i > 0; i--) { 888 struct group_des *g = &groups[i - 1]; 889 int j; 890 891 for (j = 0; j < g->nports; j++) { 892 struct port_des *p = &g->ports[j]; 893 struct netmap_ring *ring = p->ring; 894 uint32_t last = p->last_tail, 895 stop = nm_ring_next(ring, ring->tail); 896 897 /* slight abuse of the API here: we touch the slot 898 * pointed to by tail 899 */ 900 for ( ; last != stop; last = nm_ring_next(ring, last)) { 901 struct netmap_slot *rs = &ring->slot[last]; 902 // XXX less aggressive? 903 rs->buf_idx = forward_packet(g + 1, rs); 904 rs->flags |= NS_BUF_CHANGED; 905 rs->ptr = 0; 906 } 907 p->last_tail = last; 908 } 909 } 910 911 912 913 if (oq) { 914 /* try to push packets from the overflow queues 915 * to the corresponding pipes 916 */ 917 for (i = 0; i < npipes; i++) { 918 struct port_des *p = &ports[i]; 919 struct overflow_queue *q = p->oq; 920 uint32_t j, lim; 921 struct netmap_ring *ring; 922 struct netmap_slot *slot; 923 924 if (oq_empty(q)) 925 continue; 926 ring = p->ring; 927 lim = nm_ring_space(ring); 928 if (!lim) 929 continue; 930 if (q->n < lim) 931 lim = q->n; 932 for (j = 0; j < lim; j++) { 933 struct netmap_slot s = oq_deq(q), tmp; 934 tmp.ptr = 0; 935 slot = &ring->slot[ring->head]; 936 tmp.buf_idx = slot->buf_idx; 937 oq_enq(freeq, &tmp); 938 *slot = s; 939 slot->flags |= NS_BUF_CHANGED; 940 ring->head = nm_ring_next(ring, ring->head); 941 } 942 } 943 } 944 945 /* push any new packets from the input port to the first group */ 946 int batch = 0; 947 for (i = rxport->nmd->first_rx_ring; i <= rxport->nmd->last_rx_ring; i++) { 948 struct netmap_ring *rxring = NETMAP_RXRING(rxport->nmd->nifp, i); 949 950 //D("prepare to scan rings"); 951 int next_head = rxring->head; 952 struct netmap_slot *next_slot = &rxring->slot[next_head]; 953 const char *next_buf = NETMAP_BUF(rxring, next_slot->buf_idx); 954 while (!nm_ring_empty(rxring)) { 955 struct netmap_slot *rs = next_slot; 956 struct group_des *g = &groups[0]; 957 ++received_pkts; 958 received_bytes += rs->len; 959 960 // CHOOSE THE CORRECT OUTPUT PIPE 961 rs->ptr = pkt_hdr_hash((const unsigned char *)next_buf, 4, 'B'); 962 if (rs->ptr == 0) { 963 non_ip++; // XXX ?? 964 } 965 // prefetch the buffer for the next round 966 next_head = nm_ring_next(rxring, next_head); 967 next_slot = &rxring->slot[next_head]; 968 next_buf = NETMAP_BUF(rxring, next_slot->buf_idx); 969 __builtin_prefetch(next_buf); 970 // 'B' is just a hashing seed 971 rs->buf_idx = forward_packet(g, rs); 972 rs->flags |= NS_BUF_CHANGED; 973 rxring->head = rxring->cur = next_head; 974 975 batch++; 976 if (unlikely(batch >= glob_arg.batch)) { 977 ioctl(rxport->nmd->fd, NIOCRXSYNC, NULL); 978 batch = 0; 979 } 980 ND(1, 981 "Forwarded Packets: %"PRIu64" Dropped packets: %"PRIu64" Percent: %.2f", 982 forwarded, dropped, 983 ((float)dropped / (float)forwarded * 100)); 984 } 985 986 } 987 988 send_stats: 989 if (counters_buf.status == COUNTERS_FULL) 990 continue; 991 /* take a new snapshot of the counters */ 992 gettimeofday(&counters_buf.ts, NULL); 993 for (i = 0; i < npipes; i++) { 994 struct my_ctrs *c = &counters_buf.ctrs[i]; 995 *c = ports[i].ctr; 996 /* 997 * If there are overflow queues, copy the number of them for each 998 * port to the ctrs.oq_n variable for each port. 999 */ 1000 if (ports[i].oq != NULL) 1001 c->oq_n = ports[i].oq->n; 1002 } 1003 counters_buf.received_pkts = received_pkts; 1004 counters_buf.received_bytes = received_bytes; 1005 counters_buf.non_ip = non_ip; 1006 if (freeq != NULL) 1007 counters_buf.freeq_n = freeq->n; 1008 __sync_synchronize(); 1009 counters_buf.status = COUNTERS_FULL; 1010 } 1011 1012 /* 1013 * If freeq exists, copy the number to the freeq_n member of the 1014 * message struct, otherwise set it to 0. 1015 */ 1016 if (freeq != NULL) { 1017 freeq_n = freeq->n; 1018 } else { 1019 freeq_n = 0; 1020 } 1021 1022 pthread_join(stat_thread, NULL); 1023 1024 printf("%"PRIu64" packets forwarded. %"PRIu64" packets dropped. Total %"PRIu64"\n", forwarded, 1025 dropped, forwarded + dropped); 1026 return 0; 1027 } 1028