1 /*
2  *  Copyright (c) 2009-2020, Peter Haag
3  *  Copyright (c) 2004-2008, SWITCH - Teleinformatikdienste fuer Lehre und Forschung
4  *  All rights reserved.
5  *
6  *  Redistribution and use in source and binary forms, with or without
7  *  modification, are permitted provided that the following conditions are met:
8  *
9  *   * Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *   * Redistributions in binary form must reproduce the above copyright notice,
12  *     this list of conditions and the following disclaimer in the documentation
13  *     and/or other materials provided with the distribution.
14  *   * Neither the name of the author nor the names of its contributors may be
15  *     used to endorse or promote products derived from this software without
16  *     specific prior written permission.
17  *
18  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  *  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  *  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  *  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22  *  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  *  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  *  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  *  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  *  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  *  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  *  POSSIBILITY OF SUCH DAMAGE.
29  *
30  */
31 
32 #include "config.h"
33 
34 #include <stdio.h>
35 #include <unistd.h>
36 #include <stdlib.h>
37 #include <stdarg.h>
38 #include <errno.h>
39 #include <time.h>
40 #include <string.h>
41 #include <sys/types.h>
42 #include <sys/stat.h>
43 #include <sys/param.h>
44 #include <netinet/in.h>
45 #include <fcntl.h>
46 
47 #ifdef HAVE_STDINT_H
48 #include <stdint.h>
49 #endif
50 
51 #include "util.h"
52 #include "nfdump.h"
53 #include "nftree.h"
54 #include "nffile.h"
55 #include "nfx.h"
56 #include "nfstat.h"
57 #include "nfstatfile.h"
58 #include "bookkeeper.h"
59 #include "collector.h"
60 #include "exporter.h"
61 #include "ipconv.h"
62 #include "flist.h"
63 #include "profile.h"
64 
65 /* externals */
66 extern exporter_t **exporter_list;
67 
68 /* Local Variables */
69 static const char *nfdump_version = VERSION;
70 
71 
72 extension_map_list_t *extension_map_list;
73 uint32_t is_anonymized;
74 char Ident[IDENTLEN];
75 
76 #ifdef HAVE_INFLUXDB
77 	char influxdb_url[1024]="";
78 #endif
79 
80 /* Function Prototypes */
81 static void usage(char *name);
82 
83 static profile_param_info_t *ParseParams (char *profile_datadir);
84 
85 static void process_data(profile_channel_info_t *channels, unsigned int num_channels, time_t tslot);
86 
87 /* Functions */
88 
89 #include "nfdump_inline.c"
90 #include "nffile_inline.c"
91 
usage(char * name)92 static void usage(char *name) {
93 		printf("usage %s [options] \n"
94 					"-h\t\tthis text you see right here\n"
95 					"-V\t\tPrint version and exit.\n"
96 					"-D <dns>\tUse nameserver <dns> for host lookup.\n"
97 					"-M <expr>\tRead input from multiple directories.\n"
98 					"-r\t\tread input from file\n"
99 					"-f\t\tfilename with filter syntaxfile\n"
100 					"-p\t\tprofile data dir.\n"
101 					"-P\t\tprofile stat dir.\n"
102 					"-s\t\tprofile subdir.\n"
103 					"-Z\t\tCheck filter syntax and exit.\n"
104 					"-S subdir\tSub directory format. see nfcapd(1) for format\n"
105 					"-z\t\tCompress flows in output file.\n"
106 #ifdef HAVE_INFLUXDB
107 					"-i <influxurl>\tInfluxdb url for stats (example: http://localhost:8086/write?db=mydb&u=pippo&p=paperino)\n"
108 #endif
109 					"-t <time>\ttime for RRD update\n", name);
110 } /* usage */
111 
112 
process_data(profile_channel_info_t * channels,unsigned int num_channels,time_t tslot)113 static void process_data(profile_channel_info_t *channels, unsigned int num_channels, time_t tslot) {
114 common_record_t	*flow_record;
115 nffile_t		*nffile;
116 FilterEngine_t	*engine;
117 int 		i, j, done, ret ;
118 
119 	nffile = GetNextFile(NULL, 0, 0);
120 	if ( !nffile ) {
121 		LogError("GetNextFile() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno) );
122 		return;
123 	}
124 	if ( nffile == EMPTY_LIST ) {
125 		LogError("Empty file list. No files to process\n");
126 		return;
127 	}
128 
129 	// store infos away for later use
130 	// although multiple files may be processed, it is assumed that all
131 	// have the same settings
132 	is_anonymized = IP_ANONYMIZED(nffile);
133 	strncpy(Ident, nffile->file_header->ident, IDENTLEN);
134 	Ident[IDENTLEN-1] = '\0';
135 
136 	done = 0;
137 	while ( !done ) {
138 
139 		// get next data block from file
140 		ret = ReadBlock(nffile);
141 
142 		switch (ret) {
143 			case NF_CORRUPT:
144 			case NF_ERROR:
145 				if ( ret == NF_CORRUPT )
146 					LogError("Skip corrupt data file '%s'\n",GetCurrentFilename());
147 				else
148 					LogError("Read error in file '%s': %s\n",GetCurrentFilename(), strerror(errno) );
149 				// fall through - get next file in chain
150 			case NF_EOF: {
151 				nffile_t *next = GetNextFile(nffile, 0, 0);
152 				if ( next == EMPTY_LIST ) {
153 					done = 1;
154 				}
155 				if ( next == NULL ) {
156 					done = 1;
157 					LogError("Unexpected end of file list\n");
158 				}
159 				continue;
160 
161 				} break; // not really needed
162 		}
163 
164 		if ( nffile->block_header->id != DATA_BLOCK_TYPE_2 ) {
165 			LogError("Can't process block type %u. Skip block.\n", nffile->block_header->id);
166 			continue;
167 		}
168 
169 		flow_record = nffile->buff_ptr;
170 		uint32_t sumSize = 0;
171 		for ( i=0; i < nffile->block_header->NumRecords; i++ ) {
172 			if ( (sumSize + flow_record->size) > ret || (flow_record->size < sizeof(record_header_t))) {
173 				LogError("Corrupt data file. Inconsistent block size in %s line %d\n", __FILE__, __LINE__);
174 				exit(255);
175 			}
176 			sumSize += flow_record->size;
177 
178 			switch ( flow_record->type ) {
179 					case CommonRecordType: {
180 					exporter_t *exp_info = exporter_list[flow_record->exporter_sysid];
181 					uint32_t map_id = flow_record->ext_map;
182 					master_record_t	*master_record;
183 
184 					if ( extension_map_list->slot[map_id] == NULL ) {
185 						LogError("Corrupt data file. Missing extension map %u. Skip record.\n", flow_record->ext_map);
186 						flow_record = (common_record_t *)((pointer_addr_t)flow_record + flow_record->size);
187 						continue;
188 					}
189 
190 					master_record = &(extension_map_list->slot[map_id]->master_record);
191 					ExpandRecord_v2( flow_record, extension_map_list->slot[flow_record->ext_map],
192 						exp_info ? &(exp_info->info) : NULL, master_record);
193 
194 					for ( j=0; j < num_channels; j++ ) {
195 						int match;
196 
197 						// apply profile filter
198 						(channels[j].engine)->nfrecord 	= (uint64_t *)master_record;
199 						engine = channels[j].engine;
200 						match = (*engine->FilterEngine)(engine);
201 
202 						// if profile filter failed -> next profile
203 						if ( !match )
204 							continue;
205 
206 						// filter was successful -> continue record processing
207 
208 						// update statistics
209 						UpdateStat(&channels[j].stat_record, master_record);
210 						if ( channels[j].nffile )
211 							UpdateStat(channels[j].nffile->stat_record, master_record);
212 
213 						// do we need to write data to new file - shadow profiles do not have files.
214 						// check if we need to flush the output buffer
215 						if ( channels[j].nffile != NULL ) {
216 							// write record to output buffer
217 							AppendToBuffer(channels[j].nffile, (void *)flow_record, flow_record->size);
218 						}
219 
220 					} // End of for all channels
221 
222 					} break;
223 				case ExtensionMapType: {
224 					extension_map_t *map = (extension_map_t *)flow_record;
225 
226 					int ret = Insert_Extension_Map(extension_map_list, map);
227 					switch (ret) {
228 						case 0:
229 							break; // map already known and flushed
230 						case 1: {
231 							int j;
232 							for ( j=0; j < num_channels; j++ ) {
233 								if ( channels[j].nffile != NULL ) {
234 									// flush new map
235 									AppendToBuffer(channels[j].nffile, (void *)map, map->size);
236 								}
237 							}
238 							} break;
239 						default:
240 							LogError("Corrupt data file. Unable to decode at %s line %d\n", __FILE__, __LINE__);
241 							exit(255);
242 					}
243 					if ( Insert_Extension_Map(extension_map_list, map) ) {
244 					} // else map already known and flushed
245 
246 					} break;
247 				case ExporterInfoRecordType: {
248 					int err = AddExporterInfo((exporter_info_record_t *)flow_record);
249 					if ( err != 0 ) {
250 						int j;
251 						for ( j=0; j < num_channels; j++ ) {
252 							if ( channels[j].nffile != NULL && err == 1) {
253 								// flush new exporter
254 								AppendToBuffer(channels[j].nffile, (void *)flow_record, flow_record->size);
255 							}
256 						}
257 					} else {
258 						LogError("Failed to add Exporter Record\n");
259 					}
260 					} break;
261 				case SamplerInfoRecordype: {
262 					int err = AddSamplerInfo((sampler_info_record_t *)flow_record);
263 					if ( err != 0 ) {
264 						int j;
265 						for ( j=0; j < num_channels; j++ ) {
266 							if ( channels[j].nffile != NULL && err == 1 ) {
267 								// flush new map
268 								AppendToBuffer(channels[j].nffile, (void *)flow_record, flow_record->size);
269 							}
270 						}
271 					} else {
272 						LogError("Failed to add Sampler Record\n");
273 					}
274 					} break;
275 				case LegacyRecordType1:
276 				case LegacyRecordType2:
277 				case ExporterStatRecordType:
278 						// Silently skip exporter records
279 					break;
280 				default:  {
281 					LogError("Skip unknown record type %i\n", flow_record->type);
282 				}
283 			}
284 			// Advance pointer by number of bytes for netflow record
285 			flow_record = (common_record_t *)((pointer_addr_t)flow_record + flow_record->size);
286 
287 		} // End of for all umRecords
288 	} // End of while !done
289 
290 	// do we need to write data to new file - shadow profiles do not have files.
291 	for ( j=0; j < num_channels; j++ ) {
292 		if ( channels[j].nffile != NULL ) {
293 			// flush output buffer
294 			if ( channels[j].nffile->block_header->NumRecords ) {
295 				if ( WriteBlock(channels[j].nffile) <= 0 ) {
296 					LogError("Failed to write output buffer to disk: '%s'" , strerror(errno));
297 				}
298 			}
299 		}
300 	}
301 	CloseFile(nffile);
302 	DisposeFile(nffile);
303 
304 } // End of process_data
305 
ParseParams(char * profile_datadir)306 static profile_param_info_t *ParseParams (char *profile_datadir) {
307 struct stat stat_buf;
308 char line[512], path[MAXPATHLEN], *p, *q, *s;
309 profile_param_info_t *profile_list;
310 profile_param_info_t **list = &profile_list;
311 
312 	profile_list = NULL;
313 	while ( ( fgets(line, 512, stdin) != NULL )) {
314 		line[511] = '\0';
315 
316 		if ( *list == NULL )
317 			*list = (profile_param_info_t *)malloc(sizeof(profile_param_info_t));
318 		// else we come from a continue statement with illegal data - overwrite
319 
320 		if ( !*list) {
321 			LogError("malloc() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno) );
322 			return NULL;
323 		}
324 
325 		(*list)->next 		  = NULL;
326 		(*list)->profilegroup = NULL;
327 		(*list)->profilename  = NULL;
328 		(*list)->channelname  = NULL;
329 		(*list)->channel_sourcelist = NULL;
330 		(*list)->profiletype  = 0;
331 
332 		// delete '\n' at the end of line
333 		// format of stdin config line:
334 		// <profilegroup>#<profilename>#<profiletype>#<channelname>#<channel_sourcelist>
335 		p = strchr(line, '\n');
336 		if ( p ) *p = '\0';
337 		LogInfo("Process line '%s'\n", line);
338 
339 		q = line;
340 		p = strchr(q, '#');
341 		if ( p )
342 			*p = '\0';
343 
344 		s = line;
345 
346 		// savety check: if no separator found loop to next line
347 		if ( !p ) {
348 			LogError("Incomplete line - channel skipped.\n");
349 			continue;
350 		}
351 
352 		q = p;
353 		q++;
354 
355 		p = strchr(q, '#');
356 		if ( p )
357 			*p = '\0';
358 
359 		snprintf(path, MAXPATHLEN-1, "%s/%s/%s", profile_datadir, s, q);
360 		path[MAXPATHLEN-1] = '\0';
361 		if ( stat(path, &stat_buf) || !S_ISDIR(stat_buf.st_mode) ) {
362 			LogError("profile '%s' not found in group %s. Skipped.\n", q, s);
363 			continue;
364 		}
365 
366 		(*list)->profilegroup = strdup(s);
367 		(*list)->profilename  = strdup(q);
368 
369 		// savety check: if no separator found loop to next line
370 		if ( !p ) {
371 			LogError("Incomplete line - channel skipped.\n");
372 			continue;
373 		}
374 
375 		q = p;
376 		q++;
377 
378 		p = strchr(q, '#');
379 		if ( p )
380 			*p = '\0';
381 
382 		s = q;
383 		while ( *s ) {
384 			if ( *s < '0' || *s > '9' ) {
385 				LogError("Not a valid number: %s\n", q);
386 				s = NULL;
387 				break;
388 			}
389 			s++;
390 		}
391 		if ( s == NULL )
392 			continue;
393 
394 		(*list)->profiletype = (int)strtol(q, (char **)NULL, 10);
395 
396 		// savety check: if no separator found loop to next line
397 		if ( !p ) {
398 			LogError("Incomplete line - channel skipped.\n");
399 			continue;
400 		}
401 
402 		q = p;
403 		q++;
404 
405 		p = strchr(q, '#');
406 		if ( p )
407 			*p = '\0';
408 
409 		snprintf(path, MAXPATHLEN-1, "%s/%s/%s/%s", profile_datadir, (*list)->profilegroup, (*list)->profilename, q);
410 		path[MAXPATHLEN-1] = '\0';
411 		if ( stat(path, &stat_buf) || !S_ISDIR(stat_buf.st_mode) ) {
412 			LogError("channel '%s' in profile '%s' not found. Skipped.\n", q, (*list)->profilename);
413 			continue;
414 		}
415 
416 		(*list)->channelname = strdup(q);
417 
418 		if ( !p ) {
419 			LogError("Incomplete line - Skipped.\n");
420 			continue;
421 		}
422 
423 		q = p;
424 		q++;
425 
426 		p = strchr(q, '#');
427 		if ( p )
428 			*p = '\0';
429 
430 		// Skip leading '| chars
431 		while ( *q && *q == '|' ) {
432 			q++;
433 		}
434 		s = q;
435 
436 		// if q is already empty ( '\0' ) loop is not processed
437 		while ( *s ) {
438 			// as s[0] is not '\0' s[1] may be '\0' but still valid and in range
439 			if ( s[0] == '|' && s[1] == '|' ) {
440 				char *t = s;
441 				t++;
442 				while ( *t ) {	// delete this empty channel name
443 					t[0] = t[1];
444 					t++;
445 				}
446 			} else
447 				s++;
448 		}
449 		// we have no doublicate '|' here any more
450 		// check if last char is an extra '|'
451 		if ( *q && (q[strlen(q)-1] == '|') )
452 			q[strlen(q)-1] = '\0';
453 
454 		if ( *q && (strcmp(q, "*") != 0) )
455 			(*list)->channel_sourcelist = strdup(q);
456 
457 		list = &((*list)->next);
458 	}
459 
460 	if ( *list != NULL ) {
461 		free(*list);
462 		*list = NULL;
463 	}
464 
465 	if ( ferror(stdin) ) {
466 		LogError("fgets() error: %s", strerror(errno));
467 		return NULL;
468 	}
469 
470 	return profile_list;
471 
472 } // End of ParseParams
473 
main(int argc,char ** argv)474 int main( int argc, char **argv ) {
475 unsigned int		num_channels, compress;
476 struct stat stat_buf;
477 profile_param_info_t *profile_list;
478 char *rfile, *ffile, *filename, *Mdirs;
479 char	*profile_datadir, *profile_statdir, *nameserver;
480 int c, syntax_only, subdir_index, stdin_profile_params;
481 time_t tslot;
482 
483 	profile_datadir = NULL;
484 	profile_statdir = NULL;
485 	Mdirs 			= NULL;
486 	tslot 			= 0;
487 	syntax_only	    = 0;
488 	compress		= NOT_COMPRESSED;
489 	subdir_index	= 0;
490 	profile_list	= NULL;
491 	nameserver		= NULL;
492 	stdin_profile_params = 0;
493 	is_anonymized	= 0;
494 
495 	strncpy(Ident, "none", IDENTLEN);
496 	Ident[IDENTLEN-1] = '\0';
497 
498 	// default file names
499 	ffile = "filter.txt";
500 	rfile = NULL;
501 	while ((c = getopt(argc, argv, "D:HIL:p:P:hi:f:J;r:n:M:S:t:VzZ")) != EOF) {
502 		switch (c) {
503 			case 'h':
504 				usage(argv[0]);
505 				exit(0);
506 				break;
507 			case 'D':
508 				nameserver = optarg;
509 				if ( !set_nameserver(nameserver) ) {
510 					exit(255);
511 				}
512 				break;
513 			case 'I':
514 				stdin_profile_params = 1;
515 				break;
516 			case 'L':
517 				if ( !InitLog(0, "nfprofile", optarg, 0) )
518 					exit(255);
519 				break;
520 			case 'Z':
521 				syntax_only = 1;
522 				break;
523 			case 'p':
524 				profile_datadir = optarg;
525 				break;
526 			case 'P':
527 				profile_statdir = optarg;
528 				break;
529 			case 'S':
530 				subdir_index = atoi(optarg);
531 				break;
532 			case 'V':
533 				printf("%s: Version: %s\n",argv[0], nfdump_version);
534 				exit(0);
535 				break;
536 			case 'f':
537 				ffile = optarg;
538 				break;
539 			case 't':
540 				tslot = atoi(optarg);
541 				break;
542 			case 'M':
543 				Mdirs = optarg;
544 				break;
545 			case 'r':
546 				rfile = optarg;
547 				break;
548 			case 'j':
549 				if ( compress ) {
550 					LogError("Use one compression: -z for LZO, -j for BZ2 or -y for LZ4 compression\n");
551 					exit(255);
552 				}
553 				compress = BZ2_COMPRESSED;
554 				break;
555 			case 'y':
556 				if ( compress ) {
557 					LogError("Use one compression: -z for LZO, -j for BZ2 or -y for LZ4 compression\n");
558 					exit(255);
559 				}
560 				compress = LZ4_COMPRESSED;
561 				break;
562 			case 'z':
563 				if ( compress ) {
564 					LogError("Use one compression: -z for LZO, -j for BZ2 or -y for LZ4 compression\n");
565 					exit(255);
566 				}
567 				compress = LZO_COMPRESSED;
568 				break;
569 #ifdef HAVE_INFLUXDB
570 			case 'i':
571 				if ( optarg != NULL )
572 					strncpy(influxdb_url, optarg, 1024);
573 				else {
574 					LogError("Missing argument for -i <influx URL>\n");
575 					exit(255);
576 				}
577 				influxdb_url[1023] = '\0';
578 				break;
579 #endif
580 			default:
581 				usage(argv[0]);
582 				exit(0);
583 		}
584 	}
585 
586 	if ( subdir_index && !InitHierPath(subdir_index) ) {
587 		exit(255);
588 	}
589 
590 	if ( !profile_datadir ) {
591 		LogError("Profile data directory required!\n");
592 		exit(255);
593 	}
594 
595 	if ( !profile_statdir ) {
596 		profile_statdir = profile_datadir;
597 	}
598 
599 	if ( stat(profile_datadir, &stat_buf) || !S_ISDIR(stat_buf.st_mode) ) {
600 		LogError("'%s' not a directory\n", profile_datadir);
601 		exit(255);
602 	}
603 
604 	if ( stdin_profile_params ) {
605 		profile_list = ParseParams(profile_datadir);
606 		if ( !profile_list ) {
607 			exit(254);
608 		}
609 	}
610 
611 	if ( syntax_only ) {
612 		filename = NULL;
613 		rfile = NULL;
614 	} else {
615 		char *p;
616 		if ( rfile == NULL ) {
617 			LogError("-r filename required!\n");
618 			exit(255);
619 		}
620 		p = strrchr(rfile, '/');
621 		filename = p == NULL ? rfile : ++p;
622 		if ( strlen(filename) == 0 ) {
623 			LogError("Filename error: zero length filename\n");
624 			exit(254);
625 		}
626 	}
627 
628 	if ( chdir(profile_datadir)) {
629 		LogError("Error can't chdir to '%s': %s", profile_datadir, strerror(errno));
630 		exit(255);
631 	}
632 
633 	num_channels = InitChannels(profile_datadir, profile_statdir, profile_list, ffile, filename, subdir_index, syntax_only, compress);
634 
635 	// nothing to do
636 	if ( num_channels == 0 ) {
637 		LogInfo("No channels to process.\n");
638 		return 0;
639 	}
640 
641 	if ( syntax_only ) {
642 		printf("Syntax check done.\n");
643 		return 0;
644 	}
645 
646 	if ( !rfile ) {
647 		LogError("Input file (-r) required!\n");
648 		exit(255);
649 	}
650 
651 	extension_map_list = InitExtensionMaps(NEEDS_EXTENSION_LIST);
652 	if ( !InitExporterList() ) {
653 		exit(255);
654 	}
655 
656 	SetupInputFileSequence(Mdirs,rfile, NULL);
657 
658 	process_data(GetChannelInfoList(), num_channels, tslot);
659 
660 	CloseChannels(tslot, compress);
661 
662 	FreeExtensionMaps(extension_map_list);
663 
664 	return 0;
665 }
666