1 /*
2  *  Copyright (c) 2009-2020, Peter Haag
3  *  Copyright (c) 2004-2008, SWITCH - Teleinformatikdienste fuer Lehre und Forschung
4  *  All rights reserved.
5  *
6  *  Redistribution and use in source and binary forms, with or without
7  *  modification, are permitted provided that the following conditions are met:
8  *
9  *   * Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *   * Redistributions in binary form must reproduce the above copyright notice,
12  *     this list of conditions and the following disclaimer in the documentation
13  *     and/or other materials provided with the distribution.
14  *   * Neither the name of the author nor the names of its contributors may be
15  *     used to endorse or promote products derived from this software without
16  *     specific prior written permission.
17  *
18  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  *  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  *  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  *  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22  *  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  *  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  *  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  *  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  *  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  *  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  *  POSSIBILITY OF SUCH DAMAGE.
29  *
30  */
31 
32 #include "config.h"
33 
34 #include <stdio.h>
35 #include <unistd.h>
36 #include <stdlib.h>
37 #include <stdarg.h>
38 #include <errno.h>
39 #include <time.h>
40 #include <string.h>
41 #include <ctype.h>
42 #include <sys/types.h>
43 #include <sys/stat.h>
44 #include <fcntl.h>
45 #include <sys/socket.h>
46 #include <netdb.h>
47 #include <netinet/in.h>
48 #include <arpa/inet.h>
49 #include <sys/resource.h>
50 
51 #ifdef HAVE_STDINT_H
52 #include <stdint.h>
53 #endif
54 
55 #ifdef HAVE_STDIO_EXT_H
56 #include <stdio_ext.h>
57 #endif
58 
59 #include "util.h"
60 #include "nfdump.h"
61 #include "nffile.h"
62 #include "nfx.h"
63 #include "flist.h"
64 #include "nftree.h"
65 #include "nfnet.h"
66 #include "bookkeeper.h"
67 #include "collector.h"
68 #include "exporter.h"
69 #include "netflow_v5_v7.h"
70 #include "netflow_v9.h"
71 #include "nfprof.h"
72 
73 #define DEFAULTCISCOPORT "9995"
74 #define DEFAULTHOSTNAME "127.0.0.1"
75 
76 #undef	FPURGE
77 #ifdef	HAVE___FPURGE
78 #define	FPURGE	__fpurge
79 #endif
80 #ifdef	HAVE_FPURGE
81 #define	FPURGE	fpurge
82 #endif
83 
84 /* Global Variables */
85 FilterEngine_t *Engine;
86 static int verbose;
87 
88 /* Local Variables */
89 static const char *nfdump_version = VERSION;
90 
91 static send_peer_t peer;
92 
93 extension_map_list_t *extension_map_list;
94 
95 /* Function Prototypes */
96 static void usage(char *name);
97 
98 static void send_blast(unsigned int delay );
99 
100 static void send_data(char *rfile, time_t twin_start, time_t twin_end, uint32_t count,
101 				unsigned int delay,  int confirm, int netflow_version, int distribution);
102 
103 static int FlushBuffer(int confirm);
104 
105 /* Functions */
106 
107 #include "nffile_inline.c"
108 #include "nfdump_inline.c"
109 
usage(char * name)110 static void usage(char *name) {
111 		printf("usage %s [options] [\"filter\"]\n"
112 					"-h\t\tthis text you see right here\n"
113 					"-V\t\tPrint version and exit.\n"
114 					"-E\t\tPrint verbose messages. For debugging purpose only.\n"
115 					"-H <Host/ip>\tTarget IP address default: 127.0.0.1\n"
116 					"-j <mcast>\tSend packets to multicast group\n"
117 					"-4\t\tForce IPv4 protocol.\n"
118 					"-6\t\tForce IPv6 protocol.\n"
119 					"-L <log>\tLog to syslog facility <log>\n"
120 					"-p <port>\tTarget port default 9995\n"
121 					"-d <usec>\tDelay in usec between packets. default 10\n"
122 					"-c <cnt>\tPacket count. default send all packets\n"
123 					"-b <bsize>\tSend buffer size.\n"
124 					"-r <input>\tread from file. default: stdin\n"
125 					"-f <filter>\tfilter syntaxfile\n"
126 					"-v <version>\tUse netflow version to send flows. Either 5 or 9\n"
127 					"-z <distribution>\tSimulate real time distribution with coefficient\n"
128 					"-t <time>\ttime window for sending packets\n"
129 					"\t\tyyyy/MM/dd.hh:mm:ss[-yyyy/MM/dd.hh:mm:ss]\n"
130 					, name);
131 } /* usage */
132 
FlushBuffer(int confirm)133 static int FlushBuffer(int confirm) {
134 size_t len = (pointer_addr_t)peer.buff_ptr - (pointer_addr_t)peer.send_buffer;
135 static unsigned long cnt = 1;
136 
137 	peer.flush = 0;
138 	peer.buff_ptr = peer.send_buffer;
139 	if ( confirm ) {
140 		FPURGE(stdin);
141 		printf("Press any key to send next UDP packet [%lu] ", cnt++);
142 		fflush(stdout);
143 		fgetc(stdin);
144 	}
145 	return sendto(peer.sockfd, peer.send_buffer, len, 0, (struct sockaddr *)&(peer.addr), peer.addrlen);
146 } // End of FlushBuffer
147 
148 
send_blast(unsigned int delay)149 static void send_blast(unsigned int delay ) {
150 common_flow_header_t	*header;
151 nfprof_t    			profile_data;
152 int						i, ret;
153 u_long	 				usec, sec;
154 double 					fps;
155 
156 	peer.send_buffer = malloc(1400);
157 	if ( !peer.send_buffer ) {
158 		perror("Memory allocation error");
159 		close(peer.sockfd);
160 		return;
161 	}
162 	header = (common_flow_header_t *)peer.send_buffer;
163 	header->version = htons(255);
164 	nfprof_start(&profile_data);
165 	for ( i = 0; i < 65535; i++ ) {
166 		header->count = htons(i);
167 		ret = sendto(peer.sockfd, peer.send_buffer, 1400, 0, (struct sockaddr *)&peer.addr, peer.addrlen);
168 		if ( ret < 0 || ret != 1400 ) {
169 			perror("Error sending data");
170 		}
171 
172 		if ( delay ) {
173 			// sleep as specified
174 			usleep(delay);
175 		}
176 	}
177 	nfprof_end(&profile_data, 8*65535*1400);
178 
179 
180 	usec = profile_data.used.ru_utime.tv_usec + profile_data.used.ru_stime.tv_usec;
181 	sec = profile_data.used.ru_utime.tv_sec + profile_data.used.ru_stime.tv_sec;
182 
183 	if (usec > 1000000)
184 		usec -= 1000000, ++sec;
185 
186 	if (profile_data.tend.tv_usec < profile_data.tstart.tv_usec)
187 		profile_data.tend.tv_usec += 1000000, --profile_data.tend.tv_sec;
188 
189 	usec = profile_data.tend.tv_usec - profile_data.tstart.tv_usec;
190 	sec = profile_data.tend.tv_sec - profile_data.tstart.tv_sec;
191 	fps = (double)profile_data.numflows / ((double)sec + ((double)usec/1000000));
192 
193 	fprintf(stdout, "Wall: %lu.%-3.3lus bps: %-10.1f\n", sec, usec/1000, fps);
194 
195 
196 } // End of send_blast
197 
send_data(char * rfile,time_t twin_start,time_t twin_end,uint32_t count,unsigned int delay,int confirm,int netflow_version,int distribution)198 static void send_data(char *rfile, time_t twin_start,
199 			time_t twin_end, uint32_t count, unsigned int delay, int confirm, int netflow_version, int distribution) {
200 master_record_t	master_record;
201 common_record_t	*flow_record;
202 nffile_t		*nffile;
203 int 			i, done, ret, again;
204 uint32_t		numflows, cnt;
205 
206 	// z-parameter variables
207 	struct timeval todayTime, currentTime;
208 	double today = 0, reftime = 0;
209 	int reducer = 0;
210 
211 	// Get the first file handle
212 	nffile = GetNextFile(NULL, twin_start, twin_end);
213 	if ( !nffile ) {
214 		LogError("GetNextFile() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno) );
215 		return;
216 	}
217 	if ( nffile == EMPTY_LIST ) {
218 		LogError("Empty file list. No files to process\n");
219 		return;
220 	}
221 
222 	peer.send_buffer   	= malloc(UDP_PACKET_SIZE);
223 	peer.flush			= 0;
224 	if ( !peer.send_buffer ) {
225 		LogError("malloc() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno) );
226 		CloseFile(nffile);
227 		DisposeFile(nffile);
228 		return;
229 	}
230 	peer.buff_ptr = peer.send_buffer;
231 	peer.endp  	  = (void *)((pointer_addr_t)peer.send_buffer + UDP_PACKET_SIZE - 1);
232 
233 	if ( netflow_version == 5 )
234 		Init_v5_v7_output(&peer);
235 	else
236 		Init_v9_output(&peer);
237 
238 	numflows	= 0;
239 	done	 	= 0;
240 
241 	// setup Filter Engine to point to master_record, as any record read from file
242 	// is expanded into this record
243 	Engine->nfrecord = (uint64_t *)&master_record;
244 
245 	cnt = 0;
246 	while ( !done ) {
247 		// get next data block from file
248 		ret = ReadBlock(nffile);
249 
250 		switch (ret) {
251 			case NF_CORRUPT:
252 			case NF_ERROR:
253 				if ( ret == NF_CORRUPT )
254 					LogError("Skip corrupt data file '%s'\n",GetCurrentFilename());
255 				else
256 					LogError("Read error in file '%s': %s\n",GetCurrentFilename(), strerror(errno) );
257 				// fall through - get next file in chain
258 			case NF_EOF: {
259 				nffile_t *next = GetNextFile(nffile, twin_start, twin_end);
260 				if ( next == EMPTY_LIST ) {
261 					done = 1;
262 				}
263 				if ( next == NULL ) {
264 					done = 1;
265 					LogError("Unexpected end of file list\n");
266 				}
267 				// else continue with next file
268 				continue;
269 
270 				} break; // not really needed
271 		}
272 
273 		if ( nffile->block_header->id != DATA_BLOCK_TYPE_2 ) {
274 			LogError("Can't process block type %u. Skip block.\n", nffile->block_header->id);
275 			continue;
276 		}
277 
278 		// cnt is the number of blocks, which survived the filter
279 		// and added to the output buffer
280 		flow_record = nffile->buff_ptr;
281 		uint32_t sumSize = 0;
282 		for ( i=0; i < nffile->block_header->NumRecords; i++ ) {
283 			int match;
284 			if ( (sumSize + flow_record->size) > ret ) {
285 				LogError("Corrupt data file. Inconsistent block size in %s line %d\n", __FILE__, __LINE__);
286 				exit(255);
287 			}
288 			sumSize += flow_record->size;
289 
290 			switch ( flow_record->type ) {
291 				case CommonRecordType: {
292 					if ( extension_map_list->slot[flow_record->ext_map] == NULL ) {
293 						LogError("Corrupt data file. Missing extension map %u. Skip record.\n", flow_record->ext_map);
294 						flow_record = (common_record_t *)((pointer_addr_t)flow_record + flow_record->size);
295 						continue;
296 					}
297 
298 					// if no filter is given, the result is always true
299 					ExpandRecord_v2( flow_record, extension_map_list->slot[flow_record->ext_map], NULL, &master_record);
300 
301 					match = twin_start && (master_record.first < twin_start || master_record.last > twin_end) ? 0 : 1;
302 
303 					// filter netflow record with user supplied filter
304 					if ( match )
305 						match = (*Engine->FilterEngine)(Engine);
306 
307 					if ( match == 0 ) { // record failed to pass all filters
308 						// increment pointer by number of bytes for netflow record
309 						flow_record = (common_record_t *)((pointer_addr_t)flow_record + flow_record->size);
310 						// go to next record
311 						continue;
312 					}
313 					// Records passed filter -> continue record processing
314 
315 					if ( netflow_version == 5 )
316 						again = Add_v5_output_record(&master_record, &peer);
317 					else
318 						again = Add_v9_output_record(&master_record, &peer);
319 
320 					cnt++;
321 					numflows++;
322 
323 					if ( peer.flush ) {
324 						int err = FlushBuffer(confirm);
325 
326 						if ( err < 0 ) {
327 							perror("Error sending data");
328 							CloseFile(nffile);
329 							DisposeFile(nffile);
330 							return;
331 						}
332 
333 						if ( delay ) {
334 							// sleep as specified
335 							usleep(delay);
336 						}
337 						cnt = 0;
338 					}
339 
340 					if ( again ) {
341 						if ( netflow_version == 5 )
342 							Add_v5_output_record(&master_record, &peer);
343 						else
344 							Add_v9_output_record(&master_record, &peer);
345 						cnt++;
346 					}
347 
348 					} break;
349 				case ExtensionMapType: {
350 					extension_map_t *map = (extension_map_t *)flow_record;
351 
352 					int ret = Insert_Extension_Map(extension_map_list, map);
353 					switch (ret) {
354 						case 0:
355 							break; // map already known and flushed
356 						case 1:
357 							break; // new map
358 						default:
359 							LogError("Corrupt data file. Unable to decode at %s line %d\n", __FILE__, __LINE__);
360 							exit(255);
361 					}
362 
363 					} break;
364 				case LegacyRecordType1:
365 				case LegacyRecordType2:
366 				case ExporterInfoRecordType:
367 				case ExporterStatRecordType:
368 				case SamplerInfoRecordype:
369 						// Silently skip exporter/sampler records
370 					break;
371 			 	default: {
372 					LogError("Skip unknown record type %i\n", flow_record->type);
373 				}
374 			}
375 
376 			// z-parameter
377 			//first and last are line (tstart and tend) timestamp with milliseconds
378 			// first = (double) flow_record->first + ((double)flow_record->msec_first / 1000);
379 			double last = (double) flow_record->last + ((double)flow_record->msec_last / 1000);
380 
381 			gettimeofday(&currentTime, NULL);
382 			double now =  (double)currentTime.tv_sec + (double)currentTime.tv_usec / 1000000;
383 
384 			// remove incoherent values
385 			if (reftime == 0 && last > 1000000000 && last < 2000000000){
386 				reftime = last;
387 				gettimeofday(&todayTime, NULL);
388 	 			today =  (double)todayTime.tv_sec + (double)todayTime.tv_usec / 1000000;
389 			}
390 
391 			// Reducer avoid to have too much computation: It takes 1 over 3 line to regulate sending time
392 			if (reducer % 3 == 0 && distribution != 0 && reftime != 0 && last > 1000000000){
393 				while (last - reftime > distribution * (now - today)){
394 					gettimeofday(&currentTime, NULL);
395 					now =  (double)currentTime.tv_sec + (double)currentTime.tv_usec / 1000000;
396 				}
397 			}
398 			reducer++;
399 
400 			// Advance pointer by number of bytes for netflow record
401 			flow_record = (common_record_t *)((pointer_addr_t)flow_record + flow_record->size);
402 
403 		}
404 	} // while
405 
406 	// flush still remaining records
407 	if ( cnt ) {
408 		ret = FlushBuffer(confirm);
409 
410 		if ( ret < 0 ) {
411 			perror("Error sending data");
412 		}
413 
414 	} // if cnt
415 
416 	if (nffile) {
417 		CloseFile(nffile);
418 		DisposeFile(nffile);
419 	}
420 
421 	close(peer.sockfd);
422 
423 	return;
424 
425 } // End of send_data
426 
427 
main(int argc,char ** argv)428 int main( int argc, char **argv ) {
429 struct stat stat_buff;
430 char *rfile, *ffile, *filter, *tstring;
431 int c, confirm, ffd, ret, blast, netflow_version, distribution;
432 unsigned int delay, count, sockbuff_size;
433 time_t t_start, t_end;
434 
435 	rfile = ffile = filter = tstring = NULL;
436 	t_start = t_end = 0;
437 
438 	peer.hostname 	= NULL;
439 	peer.port 		= DEFAULTCISCOPORT;
440 	peer.mcast		= 0;
441 	peer.family		= AF_UNSPEC;
442 	peer.sockfd		= 0;
443 
444 	delay 	  		= 1;
445 	count	  		= 0xFFFFFFFF;
446 	sockbuff_size  	= 0;
447 	netflow_version	= 5;
448 	blast			= 0;
449 	verbose			= 0;
450 	confirm			= 0;
451 	distribution	= 0;
452 	while ((c = getopt(argc, argv, "46BhH:i:K:L:p:d:c:b:j:r:f:t:v:z:VY")) != EOF) {
453 		switch (c) {
454 			case 'h':
455 				usage(argv[0]);
456 				exit(0);
457 				break;
458 			case 'B':
459 				blast = 1;
460 				break;
461 			case 'E':
462 				verbose = 1;
463 				break;
464 			case 'V':
465 				printf("%s: Version: %s\n",argv[0], nfdump_version);
466 				exit(0);
467 				break;
468 			case 'Y':
469 				confirm = 1;
470 				break;
471 			case 'H':
472 			case 'i':	// compatibility with old version
473 				peer.hostname = strdup(optarg);
474 				peer.mcast	  = 0;
475 				break;
476 			case 'j':
477 				if ( peer.hostname == NULL ) {
478 					peer.hostname = strdup(optarg);
479 					peer.mcast	  = 1;
480 				} else {
481         			LogError("ERROR, -H(-i) and -j are mutually exclusive!!\n");
482         			exit(255);
483 				}
484 				break;
485 			case 'K':
486 				LogError("*** Anonymization moved! Use nfanon to anonymize flows first!\n");
487 				exit(255);
488 				break;
489 			case 'L':
490 				if ( !InitLog(0, argv[0], optarg, verbose) )
491 					exit(255);
492 				break;
493 			case 'p':
494 				peer.port = strdup(optarg);
495 				break;
496 			case 'd':
497 				delay = atoi(optarg);
498 				break;
499 			case 'v':
500 				netflow_version = atoi(optarg);
501 				if ( netflow_version != 5 && netflow_version != 9 ) {
502 					LogError("Invalid netflow version: %s. Accept only 5 or 9!\n", optarg);
503 					exit(255);
504 				}
505 				break;
506 			case 'c':
507 				count = atoi(optarg);
508 				break;
509 			case 'b':
510 				sockbuff_size = atoi(optarg);
511 				break;
512 			case 'f':
513 				ffile = optarg;
514 				break;
515 			case 't':
516 				tstring = optarg;
517 				break;
518 			case 'r':
519 				rfile = optarg;
520 				break;
521 			case 'z':
522 				distribution = atoi(optarg);
523 				break;
524 			case '4':
525 				if ( peer.family == AF_UNSPEC )
526 					peer.family = AF_INET;
527 				else {
528 					LogError("ERROR, Accepts only one protocol IPv4 or IPv6!\n");
529 					exit(255);
530 				}
531 				break;
532 			case '6':
533 				if ( peer.family == AF_UNSPEC )
534 					peer.family = AF_INET6;
535 				else {
536 					LogError("ERROR, Accepts only one protocol IPv4 or IPv6!\n");
537 					exit(255);
538 				}
539 				break;
540 			default:
541 				usage(argv[0]);
542 				exit(0);
543 		}
544 	}
545 	if (argc - optind > 1) {
546 		usage(argv[0]);
547 		exit(255);
548 	} else {
549 		/* user specified a pcap filter */
550 		filter = argv[optind];
551 	}
552 
553 	if ( peer.hostname == NULL )
554 		peer.hostname 	= DEFAULTHOSTNAME;
555 
556 	if ( !filter && ffile ) {
557 		if ( stat(ffile, &stat_buff) ) {
558 			perror("Can't stat file");
559 			exit(255);
560 		}
561 		filter = (char *)malloc(stat_buff.st_size);
562 		if ( !filter ) {
563 			perror("Memory error");
564 			exit(255);
565 		}
566 		ffd = open(ffile, O_RDONLY);
567 		if ( ffd < 0 ) {
568 			perror("Can't open file");
569 			exit(255);
570 		}
571 		ret = read(ffd, (void *)filter, stat_buff.st_size);
572 		if ( ret < 0   ) {
573 			perror("Error reading file");
574 			close(ffd);
575 			exit(255);
576 		}
577 		close(ffd);
578 	}
579 
580 	if ( !filter )
581 		filter = "any";
582 
583 	Engine = CompileFilter(filter);
584 	if ( !Engine )
585 		exit(254);
586 
587 	if ( peer.mcast )
588 		peer.sockfd = Multicast_send_socket (peer.hostname, peer.port, peer.family, sockbuff_size,
589 											&peer.addr, &peer.addrlen );
590 	else
591 		peer.sockfd = Unicast_send_socket (peer.hostname, peer.port, peer.family, sockbuff_size,
592 											&peer.addr, &peer.addrlen );
593 	if ( peer.sockfd <= 0 ) {
594 		exit(255);
595 	}
596 
597 	if ( blast ) {
598 		send_blast(delay );
599 		exit(0);
600 	}
601 
602 	extension_map_list = InitExtensionMaps(NEEDS_EXTENSION_LIST);
603 
604 	SetupInputFileSequence(NULL,rfile, NULL);
605 
606 	if ( tstring ) {
607 		if ( !ScanTimeFrame(tstring, &t_start, &t_end) )
608 			exit(255);
609 	}
610 
611 	send_data(rfile, t_start, t_end, count, delay, confirm, netflow_version,distribution);
612 
613 	FreeExtensionMaps(extension_map_list);
614 
615 	return 0;
616 }
617