1 /* Show the top 'n' flows from a libtrace source
2  *
3  */
4 #define __STDC_FORMAT_MACROS 1
5 #include "config.h"
6 #include "libtrace.h"
7 #include <stdio.h>
8 #include <getopt.h>
9 #include <stdlib.h>
10 #include <map>
11 #include <queue>
12 #include <inttypes.h>
13 #include <sys/socket.h>
14 #include <netdb.h>
15 #include <string.h>
16 #include <assert.h>
17 #ifdef HAVE_NETPACKET_PACKET_H
18 #include <netpacket/packet.h>
19 #include <net/ethernet.h>
20 #else
21 #include <net/if_dl.h>
22 #endif
23 
24 #if HAVE_NCURSES_NCURSES_H
25 #include <ncurses/ncurses.h>
26 #else
27 #include <ncurses.h>
28 #endif
29 
30 typedef enum { BITS_PER_SEC, BYTES, PERCENT } display_t;
31 display_t display_as = BYTES;
32 float interval=2;
33 double last_report=0;
34 
35 bool use_sip = true;
36 bool use_dip = true;
37 bool use_sport = true;
38 bool use_dport = true;
39 bool use_protocol = true;
40 bool quit = false;
41 bool fullspeed = false;
42 bool wide_display = false;
43 
44 uint64_t total_bytes=0;
45 uint64_t total_packets=0;
46 
cmp_sockaddr_in6(const struct sockaddr_in6 * a,const struct sockaddr_in6 * b)47 int cmp_sockaddr_in6(const struct sockaddr_in6 *a, const struct sockaddr_in6 *b)
48 {
49 	if (a->sin6_port != b->sin6_port)
50 		return a->sin6_port - b->sin6_port;
51 	return memcmp(a->sin6_addr.s6_addr,b->sin6_addr.s6_addr,sizeof(a->sin6_addr.s6_addr));
52 }
53 
cmp_sockaddr_in(const struct sockaddr_in * a,const struct sockaddr_in * b)54 int cmp_sockaddr_in(const struct sockaddr_in *a, const struct sockaddr_in *b)
55 {
56 	if (a->sin_port != b->sin_port)
57 		return a->sin_port - b->sin_port;
58 	return a->sin_addr.s_addr - b->sin_addr.s_addr;
59 }
60 
61 #ifdef HAVE_NETPACKET_PACKET_H
cmp_sockaddr_ll(const struct sockaddr_ll * a,const struct sockaddr_ll * b)62 int cmp_sockaddr_ll(const struct sockaddr_ll *a, const struct sockaddr_ll *b)
63 {
64 	return memcmp(a->sll_addr, b->sll_addr, b->sll_halen);
65 }
66 #else
cmp_sockaddr_dl(const struct sockaddr_dl * a,const struct sockaddr_dl * b)67 int cmp_sockaddr_dl(const struct sockaddr_dl *a, const struct sockaddr_dl *b)
68 {
69 	return memcmp(a->sdl_data, b->sdl_data, b->sdl_alen);
70 }
71 
72 #endif
73 
74 
cmp_sockaddr(const struct sockaddr * a,const struct sockaddr * b)75 int cmp_sockaddr(const struct sockaddr *a, const struct sockaddr *b)
76 {
77 	if (a->sa_family != b->sa_family) {
78 		return a->sa_family - b->sa_family;
79 	}
80 	switch (a->sa_family) {
81 		case AF_INET:
82 			return cmp_sockaddr_in((struct sockaddr_in *)a,(struct sockaddr_in*)b);
83 		case AF_INET6:
84 			return cmp_sockaddr_in6((struct sockaddr_in6 *)a,(struct sockaddr_in6*)b);
85 #ifdef HAVE_NETPACKET_PACKET_H
86 		case AF_PACKET:
87 			return cmp_sockaddr_ll((struct sockaddr_ll *)a,(struct sockaddr_ll*)b);
88 #else
89 		case AF_LINK:
90 			return cmp_sockaddr_dl((struct sockaddr_dl *)a, (struct sockaddr_dl *)b);
91 #endif
92 		case AF_UNSPEC:
93 			return 0; /* Can't compare UNSPEC's! */
94 		default:
95 			fprintf(stderr,"Don't know how to compare family %d\n",a->sa_family);
96 			abort();
97 	}
98 }
99 
trace_sockaddr2string(const struct sockaddr * a,socklen_t salen,char * buffer,size_t bufflen)100 char *trace_sockaddr2string(const struct sockaddr *a, socklen_t salen, char *buffer, size_t bufflen)
101 {
102 	static char intbuffer[NI_MAXHOST];
103 	char *mybuf = buffer ? buffer : intbuffer;
104 	size_t mybufflen = buffer ? bufflen : sizeof(intbuffer);
105 	int err;
106 
107 	/* Some systems (FreeBSD and Solaris, I'm looking at you) have a bug
108 	 * where they can't deal with the idea of a sockaddr_storage being
109 	 * passed into getnameinfo. Linux just deals by looking
110 	 * at sa_family and figuring out what sockaddr it is really.
111 	 *
112 	 * Anyway, the fix appears to be to manually hax the sockaddr length
113 	 * to be the right value for the underlying family.
114 	 */
115 	switch (a->sa_family) {
116 		case AF_INET:
117 			salen = sizeof(struct sockaddr_in);
118 			if ((err=getnameinfo(a, salen, mybuf, mybufflen, NULL, 0, NI_NUMERICHOST))!=0) {
119 				strncpy(mybuf,gai_strerror(err),mybufflen);
120 			}
121 			break;
122 		case AF_INET6:
123 			salen = sizeof(struct sockaddr_in6);
124 			if ((err=getnameinfo(a, salen, mybuf, mybufflen, NULL, 0, NI_NUMERICHOST))!=0) {
125 				strncpy(mybuf,gai_strerror(err),mybufflen);
126 			}
127 			break;
128 #ifdef HAVE_NETPACKET_PACKET_H
129 		case AF_PACKET:
130 			trace_ether_ntoa(((struct sockaddr_ll*)a)->sll_addr, mybuf);
131 			break;
132 #else
133 		case AF_LINK:
134 			trace_ether_ntoa((uint8_t *)((struct sockaddr_dl *)a)->sdl_data, mybuf);
135 			break;
136 #endif
137 		default:
138 			snprintf(mybuf,mybufflen,"Unknown family %d",a->sa_family);
139 	}
140 	return mybuf;
141 }
142 
set_port_for_sockaddr(struct sockaddr * sa,uint16_t port)143 static void set_port_for_sockaddr(struct sockaddr *sa,uint16_t port)
144 {
145 	switch (sa->sa_family) {
146 		case AF_INET:
147 			((struct sockaddr_in *)sa)->sin_port = htons(port);
148 			break;
149 		case AF_INET6:
150 			((struct sockaddr_in6 *)sa)->sin6_port = htons(port);
151 			break;
152 	}
153 }
154 
clear_addr_for_sockaddr(struct sockaddr * sa)155 static void clear_addr_for_sockaddr(struct sockaddr *sa)
156 {
157 	switch (sa->sa_family) {
158 		case AF_INET:
159 			((struct sockaddr_in *)sa)->sin_addr.s_addr = 0;
160 			break;
161 		case AF_INET6:
162 			memset((void*)&((struct sockaddr_in6 *)sa)->sin6_addr,0,sizeof(((struct sockaddr_in6 *)sa)->sin6_addr));
163 			break;
164 	}
165 }
166 
get_port_from_sockaddr(struct sockaddr * sa)167 static uint16_t get_port_from_sockaddr(struct sockaddr *sa)
168 {
169 	switch (sa->sa_family) {
170 		case AF_INET:
171 			return ntohs(((struct sockaddr_in *)sa)->sin_port);
172 			break;
173 		case AF_INET6:
174 			return ntohs(((struct sockaddr_in6 *)sa)->sin6_port);
175 			break;
176 	}
177 
178 	return 0;
179 }
180 
181 struct flowkey_t {
182 	struct sockaddr_storage sip;
183 	struct sockaddr_storage dip;
184 	uint16_t sport;
185 	uint16_t dport;
186 	uint8_t protocol;
187 
operator <flowkey_t188 	bool operator <(const flowkey_t &b) const {
189 		int c;
190 
191 		if (use_sip) {
192 			c = cmp_sockaddr((struct sockaddr*)&sip,(struct sockaddr*)&b.sip);
193 			if (c != 0) return c<0;
194 		}
195 		if (use_dip) {
196 			c = cmp_sockaddr((struct sockaddr*)&dip,(struct sockaddr*)&b.dip);
197 			if (c != 0) return c<0;
198 		}
199 
200 		return protocol < b.protocol;
201 	}
202 };
203 
204 struct flowdata_t {
205 	uint64_t packets;
206 	uint64_t bytes;
207 };
208 
209 typedef std::map<flowkey_t,flowdata_t> flows_t;
210 
211 flows_t flows;
212 
nice_bandwidth(double bytespersec)213 const char *nice_bandwidth(double bytespersec)
214 {
215 	static char ret[1024];
216 	double bitspersec = bytespersec*8;
217 
218 	if (bitspersec>1e12)
219 		snprintf(ret,sizeof(ret),"%.03fTb/s", bitspersec/1e12);
220 	else if (bitspersec>1e9)
221 		snprintf(ret,sizeof(ret),"%.03fGb/s", bitspersec/1e9);
222 	else if (bitspersec>1e6)
223 		snprintf(ret,sizeof(ret),"%.03fMb/s", bitspersec/1e6);
224 	else if (bitspersec>1e3)
225 		snprintf(ret,sizeof(ret),"%.03fkb/s", bitspersec/1e3);
226 	else
227 		snprintf(ret,sizeof(ret),"%.03fb/s", bitspersec);
228 	return ret;
229 }
230 
per_packet(libtrace_packet_t * packet)231 static void per_packet(libtrace_packet_t *packet)
232 {
233 	flowkey_t flowkey;
234 	flows_t::iterator it;
235 
236 	if (trace_get_source_address(packet,(struct sockaddr*)&flowkey.sip)==NULL)
237 		flowkey.sip.ss_family = AF_UNSPEC;
238 
239 	if (trace_get_destination_address(packet,(struct sockaddr*)&flowkey.dip)==NULL)
240 		flowkey.dip.ss_family = AF_UNSPEC;
241 
242 	if (!use_sip)
243 		clear_addr_for_sockaddr((struct sockaddr *)&flowkey.sip);
244 
245 	if (!use_dip)
246 		clear_addr_for_sockaddr((struct sockaddr *)&flowkey.dip);
247 
248 	if (!use_sport)
249 		set_port_for_sockaddr((struct sockaddr *)&flowkey.sip,0);
250 
251 	if (!use_dport)
252 		set_port_for_sockaddr((struct sockaddr *)&flowkey.dip,0);
253 
254 	if (use_protocol && trace_get_transport(packet,&flowkey.protocol, NULL) == NULL)
255 		flowkey.protocol = 255;
256 
257 
258 	it = flows.find(flowkey);
259 	if (it == flows.end()) {
260 		flowdata_t flowdata = { 0, 0 };
261 		flows_t::value_type insdata(flowkey,flowdata);
262 		std::pair<flows_t::iterator,bool> ins= flows.insert(insdata);
263 		it = ins.first;
264 	}
265 
266 	++it->second.packets;
267 	it->second.bytes+=trace_get_wire_length(packet);
268 
269 	++total_packets;
270 	total_bytes+=trace_get_wire_length(packet);
271 
272 }
273 
274 struct flow_data_t {
275 	uint64_t bytes;
276 	uint64_t packets;
277 	struct sockaddr_storage sip;
278 	struct sockaddr_storage dip;
279 	uint8_t protocol;
280 
operator <flow_data_t281 	bool operator< (const flow_data_t &b) const {
282 		if (bytes != b.bytes) return bytes < b.bytes;
283 		return packets < b.packets;
284 	}
285 };
286 
do_report()287 static void do_report()
288 {
289 	typedef  std::priority_queue<flow_data_t> pq_t;
290 	int row,col;
291 	pq_t pq;
292 	for(flows_t::const_iterator it=flows.begin();it!=flows.end();++it) {
293 		flow_data_t data;
294 		data.bytes = it->second.bytes,
295 		data.packets = it->second.packets,
296 		data.sip = it->first.sip;
297 		data.dip = it->first.dip;
298 		data.protocol = it->first.protocol;
299 		pq.push(data);
300 	}
301 	getmaxyx(stdscr,row,col);
302 	move(0,0);
303 	printw("Total Bytes: %10" PRIu64 " (%s)\tTotal Packets: %10" PRIu64, total_bytes, nice_bandwidth(total_bytes/interval), total_packets);
304 	clrtoeol();
305 	attrset(A_REVERSE);
306 	move(1,0);
307 	if (use_sip) {
308 		printw("%*s", wide_display ? 42 : 20, "source ip");
309 		if (use_sport)
310 			printw("/");
311 		else
312 			printw("\t");
313 	}
314 	if (use_sport)
315 		printw("%s  ", "sport");
316 	if (use_dip) {
317 		printw("%*s", wide_display ? 42 : 20, "dest ip");
318 		if (use_dport)
319 			printw("/");
320 		else
321 			printw("\t");
322 	}
323 	if (use_dport)
324 		printw("%s  ", "dport");
325 	if (use_protocol)
326 		printw("%10s\t", "proto");
327 	switch(display_as) {
328 		case BYTES:
329 			printw("%7s","Bytes\t");
330 			break;
331 		case BITS_PER_SEC:
332 			printw("%14s\t","Bits/sec");
333 			break;
334 		case PERCENT:
335 			printw("%% bytes\t");
336 			break;
337 	}
338 	printw("Packets");
339 
340 	attrset(A_NORMAL);
341 	char sipstr[1024];
342 	char dipstr[1024];
343 	for(int i=1; i<row-3 && !pq.empty(); ++i) {
344 		move(i+1,0);
345 		if (use_sip) {
346 			printw("%*s", wide_display ? 42 : 20,
347 					trace_sockaddr2string(
348 						(struct sockaddr*)&pq.top().sip,
349 						sizeof(struct sockaddr_storage),
350 						sipstr,sizeof(sipstr)));
351 			if (use_sport)
352 				printw("/");
353 			else
354 				printw("\t");
355 		}
356 		if (use_sport)
357 			printw("%-5d  ", get_port_from_sockaddr((struct sockaddr*)&pq.top().sip));
358 		if (use_dip) {
359 			printw("%*s", wide_display ? 42 : 20,
360 					trace_sockaddr2string(
361 						(struct sockaddr*)&pq.top().dip,
362 						sizeof(struct sockaddr_storage),
363 						dipstr,sizeof(dipstr)));
364 			if (use_dport)
365 				printw("/");
366 			else
367 				printw("\t");
368 		}
369 		if (use_dport)
370 			printw("%-5d  ", get_port_from_sockaddr((struct sockaddr*)&pq.top().dip));
371 		if (use_protocol) {
372 			struct protoent *proto = getprotobynumber(pq.top().protocol);
373 			if (proto)
374 				printw("%-10s  ", proto->p_name);
375 			else
376 				printw("%10d  ",pq.top().protocol);
377 		}
378 		switch (display_as) {
379 			case BYTES:
380 				printw("%7" PRIu64 "\t%7" PRIu64 "\n",
381 						pq.top().bytes,
382 						pq.top().packets);
383 				break;
384 			case BITS_PER_SEC:
385 				printw("%14.03f\t%" PRIu64 "\n",
386 						8.0*pq.top().bytes/interval,
387 						pq.top().packets);
388 				break;
389 			case PERCENT:
390 				printw("%6.2f%%\t%6.2f%%\n",
391 						100.0*pq.top().bytes/total_bytes,
392 						100.0*pq.top().packets/total_packets);
393 		}
394 		pq.pop();
395 	}
396 	flows.clear();
397 	total_packets = 0;
398 	total_bytes = 0;
399 
400 	clrtobot();
401 	refresh();
402 }
403 
run_trace(libtrace_t * trace)404 static void run_trace(libtrace_t *trace)
405 {
406 	libtrace_packet_t *packet = trace_create_packet();
407 	libtrace_eventobj_t obj;
408 	fd_set rfds;
409 	struct timeval sleep_tv;
410 	struct timeval *tv = NULL;
411 
412 	do {
413 		int maxfd=0;
414 		FD_ZERO(&rfds);
415 		FD_SET(0, &rfds); /* stdin */
416 		tv=NULL;
417 		maxfd=0;
418 
419 		obj = trace_event(trace, packet);
420 		switch(obj.type) {
421 			case TRACE_EVENT_IOWAIT:
422 				FD_SET(obj.fd, &rfds);
423 				maxfd = obj.fd;
424 				break;
425 
426 			case TRACE_EVENT_SLEEP:
427 				sleep_tv.tv_sec = (int)obj.seconds;
428 				sleep_tv.tv_usec = (int)((obj.seconds - sleep_tv.tv_sec)*1000000.0);
429 
430 				tv = &sleep_tv;
431 				break;;
432 
433 			case TRACE_EVENT_TERMINATE:
434 				trace_destroy_packet(packet);
435 				return;
436 
437 			case TRACE_EVENT_PACKET:
438 				if (obj.size == -1)
439 					break;
440 				if (trace_get_seconds(packet) - last_report >= interval) {
441 					do_report();
442 
443 					last_report=trace_get_seconds(packet);
444 				}
445 				if (trace_read_packet(trace,packet) <= 0) {
446 					obj.size = -1;
447 					break;
448 				}
449 				per_packet(packet);
450 				continue;
451 		}
452 
453 		if (tv && tv->tv_sec > interval) {
454 			tv->tv_sec = (int)interval;
455 			tv->tv_usec = 0;
456 		}
457 
458 		select(maxfd+1, &rfds, 0, 0, tv);
459 		if (FD_ISSET(0, &rfds)) {
460 			switch (getch()) {
461 				case '%':
462 					display_as = PERCENT;
463 					break;
464 				case 'b':
465 					display_as = BITS_PER_SEC;
466 					break;
467 				case 'B':
468 					display_as = BYTES;
469 					break;
470 				case '\x1b': /* Escape */
471 				case 'q':
472 					quit = true;
473 					trace_destroy_packet(packet);
474 					return;
475 				case '1': use_sip 	= !use_sip; break;
476 				case '2': use_sport 	= !use_sport; break;
477 				case '3': use_dip 	= !use_dip; break;
478 				case '4': use_dport 	= !use_dport; break;
479 				case '5': use_protocol 	= !use_protocol; break;
480 			}
481 		}
482 	} while (obj.type != TRACE_EVENT_TERMINATE || obj.size == -1);
483 
484 	trace_destroy_packet(packet);
485 }
486 
usage(char * argv0)487 static void usage(char *argv0)
488 {
489 	fprintf(stderr,"usage: %s [options] libtraceuri...\n",argv0);
490 	fprintf(stderr," --filter bpfexpr\n");
491 	fprintf(stderr," -f bpfexpr\n");
492 	fprintf(stderr,"\t\tApply a bpf filter expression\n");
493 	fprintf(stderr," --snaplen snaplen\n");
494 	fprintf(stderr," -s snaplen\n");
495 	fprintf(stderr,"\t\tCapture only snaplen bytes\n");
496 	fprintf(stderr," --promisc 0|1\n");
497 	fprintf(stderr," -p 0|1\n");
498 	fprintf(stderr,"\t\tEnable/Disable promiscuous mode\n");
499 	fprintf(stderr," --bits-per-sec\n");
500 	fprintf(stderr," -B\n");
501 	fprintf(stderr,"\t\tDisplay usage in bits per second, not bytes per second\n");
502 	fprintf(stderr," --percent\n");
503 	fprintf(stderr," -P\n");
504 	fprintf(stderr,"\t\tDisplay usage in percentage of total usage\n");
505 	fprintf(stderr," --interval int\n");
506 	fprintf(stderr," -i int\n");
507 	fprintf(stderr,"\t\tUpdate the display every int seconds\n");
508 	fprintf(stderr," --wide\n");
509 	fprintf(stderr," -w\n");
510 	fprintf(stderr,"\t\tExpand IP address fields to fit IPv6 addresses\n");
511 }
512 
main(int argc,char * argv[])513 int main(int argc, char *argv[])
514 {
515 	libtrace_t *trace;
516 	libtrace_filter_t *filter=NULL;
517 	int snaplen=-1;
518 	int promisc=-1;
519 
520 	setprotoent(1);
521 
522 	while(1) {
523 		int option_index;
524 		struct option long_options[] = {
525 			{ "filter",		1, 0, 'f' },
526 			{ "snaplen",		1, 0, 's' },
527 			{ "promisc",		1, 0, 'p' },
528 			{ "help",		0, 0, 'h' },
529 			{ "libtrace-help",	0, 0, 'H' },
530 			{ "bits-per-sec",	0, 0, 'B' },
531 			{ "percent",		0, 0, 'P' },
532 			{ "interval",		1, 0, 'i' },
533 			{ "fast",		0, 0, 'F' },
534 			{ "wide", 		0, 0, 'w' },
535 			{ NULL,			0, 0, 0 }
536 		};
537 
538 		int c= getopt_long(argc, argv, "BPf:Fs:p:hHi:w12345",
539 				long_options, &option_index);
540 
541 		if (c==-1)
542 			break;
543 
544 		switch (c) {
545 			case 'f':
546 				filter=trace_create_filter(optarg);
547 				break;
548 			case 'F':
549 				fullspeed = true;
550 				break;
551 			case 's':
552 				snaplen=atoi(optarg);
553 				break;
554 			case 'p':
555 				promisc=atoi(optarg);
556 				break;
557 			case 'H':
558 				trace_help();
559 				return 1;
560 			case 'B':
561 				display_as = BITS_PER_SEC;
562 				break;
563 			case 'P':
564 				display_as = PERCENT;
565 				break;
566 			case 'i':
567 				interval = atof(optarg);
568 				if (interval<=0) {
569 					fprintf(stderr,"Interval must be >0\n");
570 					return 1;
571 				}
572 				break;
573 			case 'w':
574 				wide_display = true;
575 				break;
576 			case '1': use_sip 	= !use_sip; break;
577 			case '2': use_sport 	= !use_sport; break;
578 			case '3': use_dip 	= !use_dip; break;
579 			case '4': use_dport 	= !use_dport; break;
580 			case '5': use_protocol 	= !use_protocol; break;
581 			default:
582 				fprintf(stderr,"Unknown option: %c\n",c);
583 				/* FALL THRU */
584 			case 'h':
585 				usage(argv[0]);
586 				return 1;
587 		}
588 	}
589 
590 	if (optind>=argc) {
591 		fprintf(stderr,"Missing input uri\n");
592 		usage(argv[0]);
593 		return 1;
594 	}
595 
596 	initscr(); cbreak(); noecho();
597 
598 	while (!quit && optind<argc) {
599 		trace = trace_create(argv[optind]);
600 		++optind;
601 
602 		if (trace_is_err(trace)) {
603 			endwin();
604 			trace_perror(trace,"Opening trace file");
605 			return 1;
606 		}
607 
608 		if (snaplen>0)
609 			if (trace_config(trace,TRACE_OPTION_SNAPLEN,&snaplen)) {
610 				trace_perror(trace,"ignoring: ");
611 			}
612 		if (filter)
613 			if (trace_config(trace,TRACE_OPTION_FILTER,filter)) {
614 				trace_perror(trace,"ignoring: ");
615 			}
616 		if (promisc!=-1) {
617 			if (trace_config(trace,TRACE_OPTION_PROMISC,&promisc)) {
618 				trace_perror(trace,"ignoring: ");
619 			}
620 		}
621 		if (fullspeed) {
622 			int flag=1;
623 			if (trace_config(trace,TRACE_OPTION_EVENT_REALTIME,&flag)) {
624 				trace_perror(trace,"Setting EVENT_REALTIME option");
625 			}
626 		}
627 
628 		if (trace_start(trace)) {
629 			endwin();
630 			trace_perror(trace,"Starting trace");
631 			trace_destroy(trace);
632 			return 1;
633 		}
634 
635 		run_trace(trace);
636 
637 		if (trace_is_err(trace)) {
638 			trace_perror(trace,"Reading packets");
639 		}
640 
641 		trace_destroy(trace);
642 	}
643 
644 	endwin();
645 	endprotoent();
646 
647 	return 0;
648 }
649