1 /* 2 * Copyright (C) 2017 Corelight, Inc. and Universita` di Pisa. All rights reserved. 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions 6 * are met: 7 * 1. Redistributions of source code must retain the above copyright 8 * notice, this list of conditions and the following disclaimer. 9 * 2. Redistributions in binary form must reproduce the above copyright 10 * notice, this list of conditions and the following disclaimer in the 11 * documentation and/or other materials provided with the distribution. 12 * 13 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 14 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 15 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 16 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 17 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 18 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 19 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 20 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 21 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 22 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 23 * SUCH DAMAGE. 24 */ 25 /* $FreeBSD$ */ 26 #include <ctype.h> 27 #include <errno.h> 28 #include <inttypes.h> 29 #include <libnetmap.h> 30 #include <netinet/in.h> /* htonl */ 31 #include <pthread.h> 32 #include <signal.h> 33 #include <stdbool.h> 34 #include <stdio.h> 35 #include <stdlib.h> 36 #include <string.h> 37 #include <syslog.h> 38 #include <sys/ioctl.h> 39 #include <sys/poll.h> 40 #include <unistd.h> 41 42 #include "pkt_hash.h" 43 #include "ctrs.h" 44 45 46 /* 47 * use our version of header structs, rather than bringing in a ton 48 * of platform specific ones 49 */ 50 #ifndef ETH_ALEN 51 #define ETH_ALEN 6 52 #endif 53 54 struct compact_eth_hdr { 55 unsigned char h_dest[ETH_ALEN]; 56 unsigned char h_source[ETH_ALEN]; 57 u_int16_t h_proto; 58 }; 59 60 struct compact_ip_hdr { 61 u_int8_t ihl:4, version:4; 62 u_int8_t tos; 63 u_int16_t tot_len; 64 u_int16_t id; 65 u_int16_t frag_off; 66 u_int8_t ttl; 67 u_int8_t protocol; 68 u_int16_t check; 69 u_int32_t saddr; 70 u_int32_t daddr; 71 }; 72 73 struct compact_ipv6_hdr { 74 u_int8_t priority:4, version:4; 75 u_int8_t flow_lbl[3]; 76 u_int16_t payload_len; 77 u_int8_t nexthdr; 78 u_int8_t hop_limit; 79 struct in6_addr saddr; 80 struct in6_addr daddr; 81 }; 82 83 #define MAX_IFNAMELEN 64 84 #define MAX_PORTNAMELEN (MAX_IFNAMELEN + 40) 85 #define DEF_OUT_PIPES 2 86 #define DEF_EXTRA_BUFS 0 87 #define DEF_BATCH 2048 88 #define DEF_WAIT_LINK 2 89 #define DEF_STATS_INT 600 90 #define BUF_REVOKE 150 91 #define STAT_MSG_MAXSIZE 1024 92 93 static struct { 94 char ifname[MAX_IFNAMELEN + 1]; 95 char base_name[MAX_IFNAMELEN + 1]; 96 int netmap_fd; 97 uint16_t output_rings; 98 uint16_t num_groups; 99 uint32_t extra_bufs; 100 uint16_t batch; 101 int stdout_interval; 102 int syslog_interval; 103 int wait_link; 104 bool busy_wait; 105 } glob_arg; 106 107 /* 108 * the overflow queue is a circular queue of buffers 109 */ 110 struct overflow_queue { 111 char name[MAX_IFNAMELEN + 16]; 112 struct netmap_slot *slots; 113 uint32_t head; 114 uint32_t tail; 115 uint32_t n; 116 uint32_t size; 117 }; 118 119 static struct overflow_queue *freeq; 120 121 static inline int 122 oq_full(struct overflow_queue *q) 123 { 124 return q->n >= q->size; 125 } 126 127 static inline int 128 oq_empty(struct overflow_queue *q) 129 { 130 return q->n <= 0; 131 } 132 133 static inline void 134 oq_enq(struct overflow_queue *q, const struct netmap_slot *s) 135 { 136 if (unlikely(oq_full(q))) { 137 D("%s: queue full!", q->name); 138 abort(); 139 } 140 q->slots[q->tail] = *s; 141 q->n++; 142 q->tail++; 143 if (q->tail >= q->size) 144 q->tail = 0; 145 } 146 147 static inline struct netmap_slot 148 oq_deq(struct overflow_queue *q) 149 { 150 struct netmap_slot s = q->slots[q->head]; 151 if (unlikely(oq_empty(q))) { 152 D("%s: queue empty!", q->name); 153 abort(); 154 } 155 q->n--; 156 q->head++; 157 if (q->head >= q->size) 158 q->head = 0; 159 return s; 160 } 161 162 static volatile int do_abort = 0; 163 164 static uint64_t dropped = 0; 165 static uint64_t forwarded = 0; 166 static uint64_t received_bytes = 0; 167 static uint64_t received_pkts = 0; 168 static uint64_t non_ip = 0; 169 static uint32_t freeq_n = 0; 170 171 struct port_des { 172 char interface[MAX_PORTNAMELEN]; 173 struct my_ctrs ctr; 174 unsigned int last_sync; 175 uint32_t last_tail; 176 struct overflow_queue *oq; 177 struct nmport_d *nmd; 178 struct netmap_ring *ring; 179 struct group_des *group; 180 }; 181 182 static struct port_des *ports; 183 184 /* each group of pipes receives all the packets */ 185 struct group_des { 186 char pipename[MAX_IFNAMELEN]; 187 struct port_des *ports; 188 int first_id; 189 int nports; 190 int last; 191 int custom_port; 192 }; 193 194 static struct group_des *groups; 195 196 /* statistcs */ 197 struct counters { 198 struct timeval ts; 199 struct my_ctrs *ctrs; 200 uint64_t received_pkts; 201 uint64_t received_bytes; 202 uint64_t non_ip; 203 uint32_t freeq_n; 204 int status __attribute__((aligned(64))); 205 #define COUNTERS_EMPTY 0 206 #define COUNTERS_FULL 1 207 }; 208 209 static struct counters counters_buf; 210 211 static void * 212 print_stats(void *arg) 213 { 214 int npipes = glob_arg.output_rings; 215 int sys_int = 0; 216 (void)arg; 217 struct my_ctrs cur, prev; 218 struct my_ctrs *pipe_prev; 219 220 pipe_prev = calloc(npipes, sizeof(struct my_ctrs)); 221 if (pipe_prev == NULL) { 222 D("out of memory"); 223 exit(1); 224 } 225 226 char stat_msg[STAT_MSG_MAXSIZE] = ""; 227 228 memset(&prev, 0, sizeof(prev)); 229 while (!do_abort) { 230 int j, dosyslog = 0, dostdout = 0, newdata; 231 uint64_t pps = 0, dps = 0, bps = 0, dbps = 0, usec = 0; 232 struct my_ctrs x; 233 234 counters_buf.status = COUNTERS_EMPTY; 235 newdata = 0; 236 memset(&cur, 0, sizeof(cur)); 237 sleep(1); 238 if (counters_buf.status == COUNTERS_FULL) { 239 __sync_synchronize(); 240 newdata = 1; 241 cur.t = counters_buf.ts; 242 if (prev.t.tv_sec || prev.t.tv_usec) { 243 usec = (cur.t.tv_sec - prev.t.tv_sec) * 1000000 + 244 cur.t.tv_usec - prev.t.tv_usec; 245 } 246 } 247 248 ++sys_int; 249 if (glob_arg.stdout_interval && sys_int % glob_arg.stdout_interval == 0) 250 dostdout = 1; 251 if (glob_arg.syslog_interval && sys_int % glob_arg.syslog_interval == 0) 252 dosyslog = 1; 253 254 for (j = 0; j < npipes; ++j) { 255 struct my_ctrs *c = &counters_buf.ctrs[j]; 256 cur.pkts += c->pkts; 257 cur.drop += c->drop; 258 cur.drop_bytes += c->drop_bytes; 259 cur.bytes += c->bytes; 260 261 if (usec) { 262 x.pkts = c->pkts - pipe_prev[j].pkts; 263 x.drop = c->drop - pipe_prev[j].drop; 264 x.bytes = c->bytes - pipe_prev[j].bytes; 265 x.drop_bytes = c->drop_bytes - pipe_prev[j].drop_bytes; 266 pps = (x.pkts*1000000 + usec/2) / usec; 267 dps = (x.drop*1000000 + usec/2) / usec; 268 bps = ((x.bytes*1000000 + usec/2) / usec) * 8; 269 dbps = ((x.drop_bytes*1000000 + usec/2) / usec) * 8; 270 } 271 pipe_prev[j] = *c; 272 273 if ( (dosyslog || dostdout) && newdata ) 274 snprintf(stat_msg, STAT_MSG_MAXSIZE, 275 "{" 276 "\"ts\":%.6f," 277 "\"interface\":\"%s\"," 278 "\"output_ring\":%" PRIu16 "," 279 "\"packets_forwarded\":%" PRIu64 "," 280 "\"packets_dropped\":%" PRIu64 "," 281 "\"data_forward_rate_Mbps\":%.4f," 282 "\"data_drop_rate_Mbps\":%.4f," 283 "\"packet_forward_rate_kpps\":%.4f," 284 "\"packet_drop_rate_kpps\":%.4f," 285 "\"overflow_queue_size\":%" PRIu32 286 "}", cur.t.tv_sec + (cur.t.tv_usec / 1000000.0), 287 ports[j].interface, 288 j, 289 c->pkts, 290 c->drop, 291 (double)bps / 1024 / 1024, 292 (double)dbps / 1024 / 1024, 293 (double)pps / 1000, 294 (double)dps / 1000, 295 c->oq_n); 296 297 if (dosyslog && stat_msg[0]) 298 syslog(LOG_INFO, "%s", stat_msg); 299 if (dostdout && stat_msg[0]) 300 printf("%s\n", stat_msg); 301 } 302 if (usec) { 303 x.pkts = cur.pkts - prev.pkts; 304 x.drop = cur.drop - prev.drop; 305 x.bytes = cur.bytes - prev.bytes; 306 x.drop_bytes = cur.drop_bytes - prev.drop_bytes; 307 pps = (x.pkts*1000000 + usec/2) / usec; 308 dps = (x.drop*1000000 + usec/2) / usec; 309 bps = ((x.bytes*1000000 + usec/2) / usec) * 8; 310 dbps = ((x.drop_bytes*1000000 + usec/2) / usec) * 8; 311 } 312 313 if ( (dosyslog || dostdout) && newdata ) 314 snprintf(stat_msg, STAT_MSG_MAXSIZE, 315 "{" 316 "\"ts\":%.6f," 317 "\"interface\":\"%s\"," 318 "\"output_ring\":null," 319 "\"packets_received\":%" PRIu64 "," 320 "\"packets_forwarded\":%" PRIu64 "," 321 "\"packets_dropped\":%" PRIu64 "," 322 "\"non_ip_packets\":%" PRIu64 "," 323 "\"data_forward_rate_Mbps\":%.4f," 324 "\"data_drop_rate_Mbps\":%.4f," 325 "\"packet_forward_rate_kpps\":%.4f," 326 "\"packet_drop_rate_kpps\":%.4f," 327 "\"free_buffer_slots\":%" PRIu32 328 "}", cur.t.tv_sec + (cur.t.tv_usec / 1000000.0), 329 glob_arg.ifname, 330 received_pkts, 331 cur.pkts, 332 cur.drop, 333 counters_buf.non_ip, 334 (double)bps / 1024 / 1024, 335 (double)dbps / 1024 / 1024, 336 (double)pps / 1000, 337 (double)dps / 1000, 338 counters_buf.freeq_n); 339 340 if (dosyslog && stat_msg[0]) 341 syslog(LOG_INFO, "%s", stat_msg); 342 if (dostdout && stat_msg[0]) 343 printf("%s\n", stat_msg); 344 345 prev = cur; 346 } 347 348 free(pipe_prev); 349 350 return NULL; 351 } 352 353 static void 354 free_buffers(void) 355 { 356 int i, tot = 0; 357 struct port_des *rxport = &ports[glob_arg.output_rings]; 358 359 /* build a netmap free list with the buffers in all the overflow queues */ 360 for (i = 0; i < glob_arg.output_rings + 1; i++) { 361 struct port_des *cp = &ports[i]; 362 struct overflow_queue *q = cp->oq; 363 364 if (!q) 365 continue; 366 367 while (q->n) { 368 struct netmap_slot s = oq_deq(q); 369 uint32_t *b = (uint32_t *)NETMAP_BUF(cp->ring, s.buf_idx); 370 371 *b = rxport->nmd->nifp->ni_bufs_head; 372 rxport->nmd->nifp->ni_bufs_head = s.buf_idx; 373 tot++; 374 } 375 } 376 D("added %d buffers to netmap free list", tot); 377 378 for (i = 0; i < glob_arg.output_rings + 1; ++i) { 379 nmport_close(ports[i].nmd); 380 } 381 } 382 383 384 static void sigint_h(int sig) 385 { 386 (void)sig; /* UNUSED */ 387 do_abort = 1; 388 signal(SIGINT, SIG_DFL); 389 } 390 391 static void 392 usage(void) 393 { 394 printf("usage: lb [options]\n"); 395 printf("where options are:\n"); 396 printf(" -h view help text\n"); 397 printf(" -i iface interface name (required)\n"); 398 printf(" -p [prefix:]npipes add a new group of output pipes\n"); 399 printf(" -B nbufs number of extra buffers (default: %d)\n", DEF_EXTRA_BUFS); 400 printf(" -b batch batch size (default: %d)\n", DEF_BATCH); 401 printf(" -w seconds wait for link up (default: %d)\n", DEF_WAIT_LINK); 402 printf(" -W enable busy waiting. this will run your CPU at 100%%\n"); 403 printf(" -s seconds seconds between syslog stats messages (default: 0)\n"); 404 printf(" -o seconds seconds between stdout stats messages (default: 0)\n"); 405 exit(0); 406 } 407 408 static int 409 parse_pipes(const char *spec) 410 { 411 const char *end = index(spec, ':'); 412 static int max_groups = 0; 413 struct group_des *g; 414 415 ND("spec %s num_groups %d", spec, glob_arg.num_groups); 416 if (max_groups < glob_arg.num_groups + 1) { 417 size_t size = sizeof(*g) * (glob_arg.num_groups + 1); 418 groups = realloc(groups, size); 419 if (groups == NULL) { 420 D("out of memory"); 421 return 1; 422 } 423 } 424 g = &groups[glob_arg.num_groups]; 425 memset(g, 0, sizeof(*g)); 426 427 if (end != NULL) { 428 if (end - spec > MAX_IFNAMELEN - 8) { 429 D("name '%s' too long", spec); 430 return 1; 431 } 432 if (end == spec) { 433 D("missing prefix before ':' in '%s'", spec); 434 return 1; 435 } 436 strncpy(g->pipename, spec, end - spec); 437 g->custom_port = 1; 438 end++; 439 } else { 440 /* no prefix, this group will use the 441 * name of the input port. 442 * This will be set in init_groups(), 443 * since here the input port may still 444 * be uninitialized 445 */ 446 end = spec; 447 } 448 if (*end == '\0') { 449 g->nports = DEF_OUT_PIPES; 450 } else { 451 g->nports = atoi(end); 452 if (g->nports < 1) { 453 D("invalid number of pipes '%s' (must be at least 1)", end); 454 return 1; 455 } 456 } 457 glob_arg.output_rings += g->nports; 458 glob_arg.num_groups++; 459 return 0; 460 } 461 462 /* complete the initialization of the groups data structure */ 463 static void 464 init_groups(void) 465 { 466 int i, j, t = 0; 467 struct group_des *g = NULL; 468 for (i = 0; i < glob_arg.num_groups; i++) { 469 g = &groups[i]; 470 g->ports = &ports[t]; 471 for (j = 0; j < g->nports; j++) 472 g->ports[j].group = g; 473 t += g->nports; 474 if (!g->custom_port) 475 strcpy(g->pipename, glob_arg.base_name); 476 for (j = 0; j < i; j++) { 477 struct group_des *h = &groups[j]; 478 if (!strcmp(h->pipename, g->pipename)) 479 g->first_id += h->nports; 480 } 481 } 482 g->last = 1; 483 } 484 485 486 /* To support packets that span multiple slots (NS_MOREFRAG) we 487 * need to make sure of the following: 488 * 489 * - all fragments of the same packet must go to the same output pipe 490 * - when dropping, all fragments of the same packet must be dropped 491 * 492 * For the former point we remember and reuse the last hash computed 493 * in each input ring, and only update it when NS_MOREFRAG was not 494 * set in the last received slot (this marks the start of a new packet). 495 * 496 * For the latter point, we only update the output ring head pointer 497 * when an entire packet has been forwarded. We keep a shadow_head 498 * pointer to know where to put the next partial fragment and, 499 * when the need to drop arises, we roll it back to head. 500 */ 501 struct morefrag { 502 uint16_t last_flag; /* for input rings */ 503 uint32_t last_hash; /* for input rings */ 504 uint32_t shadow_head; /* for output rings */ 505 }; 506 507 /* push the packet described by slot rs to the group g. 508 * This may cause other buffers to be pushed down the 509 * chain headed by g. 510 * Return a free buffer. 511 */ 512 static uint32_t 513 forward_packet(struct group_des *g, struct netmap_slot *rs) 514 { 515 uint32_t hash = rs->ptr; 516 uint32_t output_port = hash % g->nports; 517 struct port_des *port = &g->ports[output_port]; 518 struct netmap_ring *ring = port->ring; 519 struct overflow_queue *q = port->oq; 520 struct morefrag *mf = (struct morefrag *)ring->sem; 521 uint16_t curmf = rs->flags & NS_MOREFRAG; 522 523 /* Move the packet to the output pipe, unless there is 524 * either no space left on the ring, or there is some 525 * packet still in the overflow queue (since those must 526 * take precedence over the new one) 527 */ 528 if (mf->shadow_head != ring->tail && (q == NULL || oq_empty(q))) { 529 struct netmap_slot *ts = &ring->slot[mf->shadow_head]; 530 struct netmap_slot old_slot = *ts; 531 532 ts->buf_idx = rs->buf_idx; 533 ts->len = rs->len; 534 ts->flags = rs->flags | NS_BUF_CHANGED; 535 ts->ptr = rs->ptr; 536 mf->shadow_head = nm_ring_next(ring, mf->shadow_head); 537 if (!curmf) { 538 ring->head = mf->shadow_head; 539 } 540 ND("curmf %2x ts->flags %2x shadow_head %3u head %3u tail %3u", 541 curmf, ts->flags, mf->shadow_head, ring->head, ring->tail); 542 port->ctr.bytes += rs->len; 543 port->ctr.pkts++; 544 forwarded++; 545 return old_slot.buf_idx; 546 } 547 548 /* use the overflow queue, if available */ 549 if (q == NULL || oq_full(q)) { 550 uint32_t scan; 551 /* no space left on the ring and no overflow queue 552 * available: we are forced to drop the packet 553 */ 554 555 /* drop previous fragments, if any */ 556 for (scan = ring->head; scan != mf->shadow_head; 557 scan = nm_ring_next(ring, scan)) { 558 struct netmap_slot *ts = &ring->slot[scan]; 559 dropped++; 560 port->ctr.drop_bytes += ts->len; 561 } 562 mf->shadow_head = ring->head; 563 564 dropped++; 565 port->ctr.drop++; 566 port->ctr.drop_bytes += rs->len; 567 return rs->buf_idx; 568 } 569 570 oq_enq(q, rs); 571 572 /* 573 * we cannot continue down the chain and we need to 574 * return a free buffer now. We take it from the free queue. 575 */ 576 if (oq_empty(freeq)) { 577 /* the free queue is empty. Revoke some buffers 578 * from the longest overflow queue 579 */ 580 uint32_t j; 581 struct port_des *lp = &ports[0]; 582 uint32_t max = lp->oq->n; 583 584 /* let lp point to the port with the longest queue */ 585 for (j = 1; j < glob_arg.output_rings; j++) { 586 struct port_des *cp = &ports[j]; 587 if (cp->oq->n > max) { 588 lp = cp; 589 max = cp->oq->n; 590 } 591 } 592 593 /* move the oldest BUF_REVOKE buffers from the 594 * lp queue to the free queue 595 * 596 * We cannot revoke a partially received packet. 597 * To make thinks simple we make sure to leave 598 * at least NETMAP_MAX_FRAGS slots in the queue. 599 */ 600 for (j = 0; lp->oq->n > NETMAP_MAX_FRAGS && j < BUF_REVOKE; j++) { 601 struct netmap_slot tmp = oq_deq(lp->oq); 602 603 dropped++; 604 lp->ctr.drop++; 605 lp->ctr.drop_bytes += tmp.len; 606 607 oq_enq(freeq, &tmp); 608 } 609 610 ND(1, "revoked %d buffers from %s", j, lq->name); 611 } 612 613 return oq_deq(freeq).buf_idx; 614 } 615 616 int main(int argc, char **argv) 617 { 618 int ch; 619 uint32_t i; 620 int rv; 621 int poll_timeout = 10; /* default */ 622 623 glob_arg.ifname[0] = '\0'; 624 glob_arg.output_rings = 0; 625 glob_arg.batch = DEF_BATCH; 626 glob_arg.wait_link = DEF_WAIT_LINK; 627 glob_arg.busy_wait = false; 628 glob_arg.syslog_interval = 0; 629 glob_arg.stdout_interval = 0; 630 631 while ( (ch = getopt(argc, argv, "hi:p:b:B:s:o:w:W")) != -1) { 632 switch (ch) { 633 case 'i': 634 D("interface is %s", optarg); 635 if (strlen(optarg) > MAX_IFNAMELEN - 8) { 636 D("ifname too long %s", optarg); 637 return 1; 638 } 639 if (strncmp(optarg, "netmap:", 7) && strncmp(optarg, "vale", 4)) { 640 sprintf(glob_arg.ifname, "netmap:%s", optarg); 641 } else { 642 strcpy(glob_arg.ifname, optarg); 643 } 644 break; 645 646 case 'p': 647 if (parse_pipes(optarg)) { 648 usage(); 649 return 1; 650 } 651 break; 652 653 case 'B': 654 glob_arg.extra_bufs = atoi(optarg); 655 D("requested %d extra buffers", glob_arg.extra_bufs); 656 break; 657 658 case 'b': 659 glob_arg.batch = atoi(optarg); 660 D("batch is %d", glob_arg.batch); 661 break; 662 663 case 'w': 664 glob_arg.wait_link = atoi(optarg); 665 D("link wait for up time is %d", glob_arg.wait_link); 666 break; 667 668 case 'W': 669 glob_arg.busy_wait = true; 670 break; 671 672 case 'o': 673 glob_arg.stdout_interval = atoi(optarg); 674 break; 675 676 case 's': 677 glob_arg.syslog_interval = atoi(optarg); 678 break; 679 680 case 'h': 681 usage(); 682 return 0; 683 break; 684 685 default: 686 D("bad option %c %s", ch, optarg); 687 usage(); 688 return 1; 689 } 690 } 691 692 if (glob_arg.ifname[0] == '\0') { 693 D("missing interface name"); 694 usage(); 695 return 1; 696 } 697 698 if (glob_arg.num_groups == 0) 699 parse_pipes(""); 700 701 if (glob_arg.syslog_interval) { 702 setlogmask(LOG_UPTO(LOG_INFO)); 703 openlog("lb", LOG_CONS | LOG_PID | LOG_NDELAY, LOG_LOCAL1); 704 } 705 706 uint32_t npipes = glob_arg.output_rings; 707 708 709 pthread_t stat_thread; 710 711 ports = calloc(npipes + 1, sizeof(struct port_des)); 712 if (!ports) { 713 D("failed to allocate the stats array"); 714 return 1; 715 } 716 struct port_des *rxport = &ports[npipes]; 717 718 rxport->nmd = nmport_prepare(glob_arg.ifname); 719 if (rxport->nmd == NULL) { 720 D("cannot parse %s", glob_arg.ifname); 721 return (1); 722 } 723 /* extract the base name */ 724 strncpy(glob_arg.base_name, rxport->nmd->hdr.nr_name, MAX_IFNAMELEN); 725 726 init_groups(); 727 728 memset(&counters_buf, 0, sizeof(counters_buf)); 729 counters_buf.ctrs = calloc(npipes, sizeof(struct my_ctrs)); 730 if (!counters_buf.ctrs) { 731 D("failed to allocate the counters snapshot buffer"); 732 return 1; 733 } 734 735 rxport->nmd->reg.nr_extra_bufs = glob_arg.extra_bufs; 736 737 if (nmport_open_desc(rxport->nmd) < 0) { 738 D("cannot open %s", glob_arg.ifname); 739 return (1); 740 } 741 D("successfully opened %s", glob_arg.ifname); 742 743 uint32_t extra_bufs = rxport->nmd->reg.nr_extra_bufs; 744 struct overflow_queue *oq = NULL; 745 /* reference ring to access the buffers */ 746 rxport->ring = NETMAP_RXRING(rxport->nmd->nifp, 0); 747 748 if (!glob_arg.extra_bufs) 749 goto run; 750 751 D("obtained %d extra buffers", extra_bufs); 752 if (!extra_bufs) 753 goto run; 754 755 /* one overflow queue for each output pipe, plus one for the 756 * free extra buffers 757 */ 758 oq = calloc(npipes + 1, sizeof(struct overflow_queue)); 759 if (!oq) { 760 D("failed to allocated overflow queues descriptors"); 761 goto run; 762 } 763 764 freeq = &oq[npipes]; 765 rxport->oq = freeq; 766 767 freeq->slots = calloc(extra_bufs, sizeof(struct netmap_slot)); 768 if (!freeq->slots) { 769 D("failed to allocate the free list"); 770 } 771 freeq->size = extra_bufs; 772 snprintf(freeq->name, MAX_IFNAMELEN, "free queue"); 773 774 /* 775 * the list of buffers uses the first uint32_t in each buffer 776 * as the index of the next buffer. 777 */ 778 uint32_t scan; 779 for (scan = rxport->nmd->nifp->ni_bufs_head; 780 scan; 781 scan = *(uint32_t *)NETMAP_BUF(rxport->ring, scan)) 782 { 783 struct netmap_slot s; 784 s.len = s.flags = 0; 785 s.ptr = 0; 786 s.buf_idx = scan; 787 ND("freeq <- %d", s.buf_idx); 788 oq_enq(freeq, &s); 789 } 790 791 792 if (freeq->n != extra_bufs) { 793 D("something went wrong: netmap reported %d extra_bufs, but the free list contained %d", 794 extra_bufs, freeq->n); 795 return 1; 796 } 797 rxport->nmd->nifp->ni_bufs_head = 0; 798 799 run: 800 atexit(free_buffers); 801 802 int j, t = 0; 803 for (j = 0; j < glob_arg.num_groups; j++) { 804 struct group_des *g = &groups[j]; 805 int k; 806 for (k = 0; k < g->nports; ++k) { 807 struct port_des *p = &g->ports[k]; 808 snprintf(p->interface, MAX_PORTNAMELEN, "%s%s{%d/xT@%d", 809 (strncmp(g->pipename, "vale", 4) ? "netmap:" : ""), 810 g->pipename, g->first_id + k, 811 rxport->nmd->reg.nr_mem_id); 812 D("opening pipe named %s", p->interface); 813 814 p->nmd = nmport_open(p->interface); 815 816 if (p->nmd == NULL) { 817 D("cannot open %s", p->interface); 818 return (1); 819 } else if (p->nmd->mem != rxport->nmd->mem) { 820 D("failed to open pipe #%d in zero-copy mode, " 821 "please close any application that uses either pipe %s}%d, " 822 "or %s{%d, and retry", 823 k + 1, g->pipename, g->first_id + k, g->pipename, g->first_id + k); 824 return (1); 825 } else { 826 struct morefrag *mf; 827 828 D("successfully opened pipe #%d %s (tx slots: %d)", 829 k + 1, p->interface, p->nmd->reg.nr_tx_slots); 830 p->ring = NETMAP_TXRING(p->nmd->nifp, 0); 831 p->last_tail = nm_ring_next(p->ring, p->ring->tail); 832 mf = (struct morefrag *)p->ring->sem; 833 mf->last_flag = 0; /* unused */ 834 mf->last_hash = 0; /* unused */ 835 mf->shadow_head = p->ring->head; 836 } 837 D("zerocopy %s", 838 (rxport->nmd->mem == p->nmd->mem) ? "enabled" : "disabled"); 839 840 if (extra_bufs) { 841 struct overflow_queue *q = &oq[t + k]; 842 q->slots = calloc(extra_bufs, sizeof(struct netmap_slot)); 843 if (!q->slots) { 844 D("failed to allocate overflow queue for pipe %d", k); 845 /* make all overflow queue management fail */ 846 extra_bufs = 0; 847 } 848 q->size = extra_bufs; 849 snprintf(q->name, sizeof(q->name), "oq %s{%4d", g->pipename, k); 850 p->oq = q; 851 } 852 } 853 t += g->nports; 854 } 855 856 if (glob_arg.extra_bufs && !extra_bufs) { 857 if (oq) { 858 for (i = 0; i < npipes + 1; i++) { 859 free(oq[i].slots); 860 oq[i].slots = NULL; 861 } 862 free(oq); 863 oq = NULL; 864 } 865 D("*** overflow queues disabled ***"); 866 } 867 868 sleep(glob_arg.wait_link); 869 870 /* start stats thread after wait_link */ 871 if (pthread_create(&stat_thread, NULL, print_stats, NULL) == -1) { 872 D("unable to create the stats thread: %s", strerror(errno)); 873 return 1; 874 } 875 876 struct pollfd pollfd[npipes + 1]; 877 memset(&pollfd, 0, sizeof(pollfd)); 878 signal(SIGINT, sigint_h); 879 880 /* make sure we wake up as often as needed, even when there are no 881 * packets coming in 882 */ 883 if (glob_arg.syslog_interval > 0 && glob_arg.syslog_interval < poll_timeout) 884 poll_timeout = glob_arg.syslog_interval; 885 if (glob_arg.stdout_interval > 0 && glob_arg.stdout_interval < poll_timeout) 886 poll_timeout = glob_arg.stdout_interval; 887 888 /* initialize the morefrag structures for the input rings */ 889 for (i = rxport->nmd->first_rx_ring; i <= rxport->nmd->last_rx_ring; i++) { 890 struct netmap_ring *rxring = NETMAP_RXRING(rxport->nmd->nifp, i); 891 struct morefrag *mf = (struct morefrag *)rxring->sem; 892 893 mf->last_flag = 0; 894 mf->last_hash = 0; 895 mf->shadow_head = 0; /* unused */ 896 } 897 898 while (!do_abort) { 899 u_int polli = 0; 900 901 for (i = 0; i < npipes; ++i) { 902 struct netmap_ring *ring = ports[i].ring; 903 int pending = nm_tx_pending(ring); 904 905 /* if there are packets pending, we want to be notified when 906 * tail moves, so we let cur=tail 907 */ 908 ring->cur = pending ? ring->tail : ring->head; 909 910 if (!glob_arg.busy_wait && !pending) { 911 /* no need to poll, there are no packets pending */ 912 continue; 913 } 914 pollfd[polli].fd = ports[i].nmd->fd; 915 pollfd[polli].events = POLLOUT; 916 pollfd[polli].revents = 0; 917 ++polli; 918 } 919 920 pollfd[polli].fd = rxport->nmd->fd; 921 pollfd[polli].events = POLLIN; 922 pollfd[polli].revents = 0; 923 ++polli; 924 925 ND(5, "polling %d file descriptors", polli); 926 rv = poll(pollfd, polli, poll_timeout); 927 if (rv <= 0) { 928 if (rv < 0 && errno != EAGAIN && errno != EINTR) 929 RD(1, "poll error %s", strerror(errno)); 930 goto send_stats; 931 } 932 933 /* if there are several groups, try pushing released packets from 934 * upstream groups to the downstream ones. 935 * 936 * It is important to do this before returned slots are reused 937 * for new transmissions. For the same reason, this must be 938 * done starting from the last group going backwards. 939 */ 940 for (i = glob_arg.num_groups - 1U; i > 0; i--) { 941 struct group_des *g = &groups[i - 1]; 942 943 for (j = 0; j < g->nports; j++) { 944 struct port_des *p = &g->ports[j]; 945 struct netmap_ring *ring = p->ring; 946 uint32_t last = p->last_tail, 947 stop = nm_ring_next(ring, ring->tail); 948 949 /* slight abuse of the API here: we touch the slot 950 * pointed to by tail 951 */ 952 for ( ; last != stop; last = nm_ring_next(ring, last)) { 953 struct netmap_slot *rs = &ring->slot[last]; 954 // XXX less aggressive? 955 rs->buf_idx = forward_packet(g + 1, rs); 956 rs->flags = NS_BUF_CHANGED; 957 rs->ptr = 0; 958 } 959 p->last_tail = last; 960 } 961 } 962 963 964 965 if (oq) { 966 /* try to push packets from the overflow queues 967 * to the corresponding pipes 968 */ 969 for (i = 0; i < npipes; i++) { 970 struct port_des *p = &ports[i]; 971 struct overflow_queue *q = p->oq; 972 uint32_t k; 973 int64_t lim; 974 struct netmap_ring *ring; 975 struct netmap_slot *slot; 976 struct morefrag *mf; 977 978 if (oq_empty(q)) 979 continue; 980 ring = p->ring; 981 mf = (struct morefrag *)ring->sem; 982 lim = ring->tail - mf->shadow_head; 983 if (!lim) 984 continue; 985 if (lim < 0) 986 lim += ring->num_slots; 987 if (q->n < lim) 988 lim = q->n; 989 for (k = 0; k < lim; k++) { 990 struct netmap_slot s = oq_deq(q), tmp; 991 tmp.ptr = 0; 992 slot = &ring->slot[mf->shadow_head]; 993 tmp.buf_idx = slot->buf_idx; 994 oq_enq(freeq, &tmp); 995 *slot = s; 996 slot->flags |= NS_BUF_CHANGED; 997 mf->shadow_head = nm_ring_next(ring, mf->shadow_head); 998 if (!(slot->flags & NS_MOREFRAG)) 999 ring->head = mf->shadow_head; 1000 } 1001 } 1002 } 1003 1004 /* push any new packets from the input port to the first group */ 1005 int batch = 0; 1006 for (i = rxport->nmd->first_rx_ring; i <= rxport->nmd->last_rx_ring; i++) { 1007 struct netmap_ring *rxring = NETMAP_RXRING(rxport->nmd->nifp, i); 1008 struct morefrag *mf = (struct morefrag *)rxring->sem; 1009 1010 //D("prepare to scan rings"); 1011 int next_head = rxring->head; 1012 struct netmap_slot *next_slot = &rxring->slot[next_head]; 1013 const char *next_buf = NETMAP_BUF(rxring, next_slot->buf_idx); 1014 while (!nm_ring_empty(rxring)) { 1015 struct netmap_slot *rs = next_slot; 1016 struct group_des *g = &groups[0]; 1017 ++received_pkts; 1018 received_bytes += rs->len; 1019 1020 // CHOOSE THE CORRECT OUTPUT PIPE 1021 // If the previous slot had NS_MOREFRAG set, this is another 1022 // fragment of the last packet and it should go to the same 1023 // output pipe as before. 1024 if (!mf->last_flag) { 1025 // 'B' is just a hashing seed 1026 mf->last_hash = pkt_hdr_hash((const unsigned char *)next_buf, 4, 'B'); 1027 } 1028 mf->last_flag = rs->flags & NS_MOREFRAG; 1029 rs->ptr = mf->last_hash; 1030 if (rs->ptr == 0) { 1031 non_ip++; // XXX ?? 1032 } 1033 // prefetch the buffer for the next round 1034 next_head = nm_ring_next(rxring, next_head); 1035 next_slot = &rxring->slot[next_head]; 1036 next_buf = NETMAP_BUF(rxring, next_slot->buf_idx); 1037 __builtin_prefetch(next_buf); 1038 rs->buf_idx = forward_packet(g, rs); 1039 rs->flags = NS_BUF_CHANGED; 1040 rxring->head = rxring->cur = next_head; 1041 1042 batch++; 1043 if (unlikely(batch >= glob_arg.batch)) { 1044 ioctl(rxport->nmd->fd, NIOCRXSYNC, NULL); 1045 batch = 0; 1046 } 1047 ND(1, 1048 "Forwarded Packets: %"PRIu64" Dropped packets: %"PRIu64" Percent: %.2f", 1049 forwarded, dropped, 1050 ((float)dropped / (float)forwarded * 100)); 1051 } 1052 1053 } 1054 1055 send_stats: 1056 if (counters_buf.status == COUNTERS_FULL) 1057 continue; 1058 /* take a new snapshot of the counters */ 1059 gettimeofday(&counters_buf.ts, NULL); 1060 for (i = 0; i < npipes; i++) { 1061 struct my_ctrs *c = &counters_buf.ctrs[i]; 1062 *c = ports[i].ctr; 1063 /* 1064 * If there are overflow queues, copy the number of them for each 1065 * port to the ctrs.oq_n variable for each port. 1066 */ 1067 if (ports[i].oq != NULL) 1068 c->oq_n = ports[i].oq->n; 1069 } 1070 counters_buf.received_pkts = received_pkts; 1071 counters_buf.received_bytes = received_bytes; 1072 counters_buf.non_ip = non_ip; 1073 if (freeq != NULL) 1074 counters_buf.freeq_n = freeq->n; 1075 __sync_synchronize(); 1076 counters_buf.status = COUNTERS_FULL; 1077 } 1078 1079 /* 1080 * If freeq exists, copy the number to the freeq_n member of the 1081 * message struct, otherwise set it to 0. 1082 */ 1083 if (freeq != NULL) { 1084 freeq_n = freeq->n; 1085 } else { 1086 freeq_n = 0; 1087 } 1088 1089 pthread_join(stat_thread, NULL); 1090 1091 printf("%"PRIu64" packets forwarded. %"PRIu64" packets dropped. Total %"PRIu64"\n", forwarded, 1092 dropped, forwarded + dropped); 1093 return 0; 1094 } 1095