1 /*
2 * Copyright (c) 2009-2020, Peter Haag
3 * Copyright (c) 2004-2008, SWITCH - Teleinformatikdienste fuer Lehre und Forschung
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright notice,
12 * this list of conditions and the following disclaimer in the documentation
13 * and/or other materials provided with the distribution.
14 * * Neither the name of the author nor the names of its contributors may be
15 * used to endorse or promote products derived from this software without
16 * specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 *
30 */
31
32 #include "config.h"
33
34 #include <stdio.h>
35 #include <unistd.h>
36 #include <stdlib.h>
37 #include <stdarg.h>
38 #include <errno.h>
39 #include <time.h>
40 #include <string.h>
41 #include <sys/types.h>
42 #include <sys/stat.h>
43 #include <sys/param.h>
44 #include <netinet/in.h>
45 #include <fcntl.h>
46
47 #ifdef HAVE_STDINT_H
48 #include <stdint.h>
49 #endif
50
51 #include "util.h"
52 #include "nfdump.h"
53 #include "nftree.h"
54 #include "nffile.h"
55 #include "nfx.h"
56 #include "nfstat.h"
57 #include "nfstatfile.h"
58 #include "bookkeeper.h"
59 #include "collector.h"
60 #include "exporter.h"
61 #include "ipconv.h"
62 #include "flist.h"
63 #include "profile.h"
64
65 /* externals */
66 extern exporter_t **exporter_list;
67
68 /* Local Variables */
69 static const char *nfdump_version = VERSION;
70
71
72 extension_map_list_t *extension_map_list;
73 uint32_t is_anonymized;
74 char Ident[IDENTLEN];
75
76 #ifdef HAVE_INFLUXDB
77 char influxdb_url[1024]="";
78 #endif
79
80 /* Function Prototypes */
81 static void usage(char *name);
82
83 static profile_param_info_t *ParseParams (char *profile_datadir);
84
85 static void process_data(profile_channel_info_t *channels, unsigned int num_channels, time_t tslot);
86
87 /* Functions */
88
89 #include "nfdump_inline.c"
90 #include "nffile_inline.c"
91
usage(char * name)92 static void usage(char *name) {
93 printf("usage %s [options] \n"
94 "-h\t\tthis text you see right here\n"
95 "-V\t\tPrint version and exit.\n"
96 "-D <dns>\tUse nameserver <dns> for host lookup.\n"
97 "-M <expr>\tRead input from multiple directories.\n"
98 "-r\t\tread input from file\n"
99 "-f\t\tfilename with filter syntaxfile\n"
100 "-p\t\tprofile data dir.\n"
101 "-P\t\tprofile stat dir.\n"
102 "-s\t\tprofile subdir.\n"
103 "-Z\t\tCheck filter syntax and exit.\n"
104 "-S subdir\tSub directory format. see nfcapd(1) for format\n"
105 "-z\t\tCompress flows in output file.\n"
106 #ifdef HAVE_INFLUXDB
107 "-i <influxurl>\tInfluxdb url for stats (example: http://localhost:8086/write?db=mydb&u=pippo&p=paperino)\n"
108 #endif
109 "-t <time>\ttime for RRD update\n", name);
110 } /* usage */
111
112
process_data(profile_channel_info_t * channels,unsigned int num_channels,time_t tslot)113 static void process_data(profile_channel_info_t *channels, unsigned int num_channels, time_t tslot) {
114 common_record_t *flow_record;
115 nffile_t *nffile;
116 FilterEngine_t *engine;
117 int i, j, done, ret ;
118
119 nffile = GetNextFile(NULL, 0, 0);
120 if ( !nffile ) {
121 LogError("GetNextFile() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno) );
122 return;
123 }
124 if ( nffile == EMPTY_LIST ) {
125 LogError("Empty file list. No files to process\n");
126 return;
127 }
128
129 // store infos away for later use
130 // although multiple files may be processed, it is assumed that all
131 // have the same settings
132 is_anonymized = IP_ANONYMIZED(nffile);
133 strncpy(Ident, nffile->file_header->ident, IDENTLEN);
134 Ident[IDENTLEN-1] = '\0';
135
136 done = 0;
137 while ( !done ) {
138
139 // get next data block from file
140 ret = ReadBlock(nffile);
141
142 switch (ret) {
143 case NF_CORRUPT:
144 case NF_ERROR:
145 if ( ret == NF_CORRUPT )
146 LogError("Skip corrupt data file '%s'\n",GetCurrentFilename());
147 else
148 LogError("Read error in file '%s': %s\n",GetCurrentFilename(), strerror(errno) );
149 // fall through - get next file in chain
150 case NF_EOF: {
151 nffile_t *next = GetNextFile(nffile, 0, 0);
152 if ( next == EMPTY_LIST ) {
153 done = 1;
154 }
155 if ( next == NULL ) {
156 done = 1;
157 LogError("Unexpected end of file list\n");
158 }
159 continue;
160
161 } break; // not really needed
162 }
163
164 if ( nffile->block_header->id != DATA_BLOCK_TYPE_2 ) {
165 LogError("Can't process block type %u. Skip block.\n", nffile->block_header->id);
166 continue;
167 }
168
169 flow_record = nffile->buff_ptr;
170 uint32_t sumSize = 0;
171 for ( i=0; i < nffile->block_header->NumRecords; i++ ) {
172 if ( (sumSize + flow_record->size) > ret || (flow_record->size < sizeof(record_header_t))) {
173 LogError("Corrupt data file. Inconsistent block size in %s line %d\n", __FILE__, __LINE__);
174 exit(255);
175 }
176 sumSize += flow_record->size;
177
178 switch ( flow_record->type ) {
179 case CommonRecordType: {
180 exporter_t *exp_info = exporter_list[flow_record->exporter_sysid];
181 uint32_t map_id = flow_record->ext_map;
182 master_record_t *master_record;
183
184 if ( extension_map_list->slot[map_id] == NULL ) {
185 LogError("Corrupt data file. Missing extension map %u. Skip record.\n", flow_record->ext_map);
186 flow_record = (common_record_t *)((pointer_addr_t)flow_record + flow_record->size);
187 continue;
188 }
189
190 master_record = &(extension_map_list->slot[map_id]->master_record);
191 ExpandRecord_v2( flow_record, extension_map_list->slot[flow_record->ext_map],
192 exp_info ? &(exp_info->info) : NULL, master_record);
193
194 for ( j=0; j < num_channels; j++ ) {
195 int match;
196
197 // apply profile filter
198 (channels[j].engine)->nfrecord = (uint64_t *)master_record;
199 engine = channels[j].engine;
200 match = (*engine->FilterEngine)(engine);
201
202 // if profile filter failed -> next profile
203 if ( !match )
204 continue;
205
206 // filter was successful -> continue record processing
207
208 // update statistics
209 UpdateStat(&channels[j].stat_record, master_record);
210 if ( channels[j].nffile )
211 UpdateStat(channels[j].nffile->stat_record, master_record);
212
213 // do we need to write data to new file - shadow profiles do not have files.
214 // check if we need to flush the output buffer
215 if ( channels[j].nffile != NULL ) {
216 // write record to output buffer
217 AppendToBuffer(channels[j].nffile, (void *)flow_record, flow_record->size);
218 }
219
220 } // End of for all channels
221
222 } break;
223 case ExtensionMapType: {
224 extension_map_t *map = (extension_map_t *)flow_record;
225
226 int ret = Insert_Extension_Map(extension_map_list, map);
227 switch (ret) {
228 case 0:
229 break; // map already known and flushed
230 case 1: {
231 int j;
232 for ( j=0; j < num_channels; j++ ) {
233 if ( channels[j].nffile != NULL ) {
234 // flush new map
235 AppendToBuffer(channels[j].nffile, (void *)map, map->size);
236 }
237 }
238 } break;
239 default:
240 LogError("Corrupt data file. Unable to decode at %s line %d\n", __FILE__, __LINE__);
241 exit(255);
242 }
243 if ( Insert_Extension_Map(extension_map_list, map) ) {
244 } // else map already known and flushed
245
246 } break;
247 case ExporterInfoRecordType: {
248 int err = AddExporterInfo((exporter_info_record_t *)flow_record);
249 if ( err != 0 ) {
250 int j;
251 for ( j=0; j < num_channels; j++ ) {
252 if ( channels[j].nffile != NULL && err == 1) {
253 // flush new exporter
254 AppendToBuffer(channels[j].nffile, (void *)flow_record, flow_record->size);
255 }
256 }
257 } else {
258 LogError("Failed to add Exporter Record\n");
259 }
260 } break;
261 case SamplerInfoRecordype: {
262 int err = AddSamplerInfo((sampler_info_record_t *)flow_record);
263 if ( err != 0 ) {
264 int j;
265 for ( j=0; j < num_channels; j++ ) {
266 if ( channels[j].nffile != NULL && err == 1 ) {
267 // flush new map
268 AppendToBuffer(channels[j].nffile, (void *)flow_record, flow_record->size);
269 }
270 }
271 } else {
272 LogError("Failed to add Sampler Record\n");
273 }
274 } break;
275 case LegacyRecordType1:
276 case LegacyRecordType2:
277 case ExporterStatRecordType:
278 // Silently skip exporter records
279 break;
280 default: {
281 LogError("Skip unknown record type %i\n", flow_record->type);
282 }
283 }
284 // Advance pointer by number of bytes for netflow record
285 flow_record = (common_record_t *)((pointer_addr_t)flow_record + flow_record->size);
286
287 } // End of for all umRecords
288 } // End of while !done
289
290 // do we need to write data to new file - shadow profiles do not have files.
291 for ( j=0; j < num_channels; j++ ) {
292 if ( channels[j].nffile != NULL ) {
293 // flush output buffer
294 if ( channels[j].nffile->block_header->NumRecords ) {
295 if ( WriteBlock(channels[j].nffile) <= 0 ) {
296 LogError("Failed to write output buffer to disk: '%s'" , strerror(errno));
297 }
298 }
299 }
300 }
301 CloseFile(nffile);
302 DisposeFile(nffile);
303
304 } // End of process_data
305
ParseParams(char * profile_datadir)306 static profile_param_info_t *ParseParams (char *profile_datadir) {
307 struct stat stat_buf;
308 char line[512], path[MAXPATHLEN], *p, *q, *s;
309 profile_param_info_t *profile_list;
310 profile_param_info_t **list = &profile_list;
311
312 profile_list = NULL;
313 while ( ( fgets(line, 512, stdin) != NULL )) {
314 line[511] = '\0';
315
316 if ( *list == NULL )
317 *list = (profile_param_info_t *)malloc(sizeof(profile_param_info_t));
318 // else we come from a continue statement with illegal data - overwrite
319
320 if ( !*list) {
321 LogError("malloc() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno) );
322 return NULL;
323 }
324
325 (*list)->next = NULL;
326 (*list)->profilegroup = NULL;
327 (*list)->profilename = NULL;
328 (*list)->channelname = NULL;
329 (*list)->channel_sourcelist = NULL;
330 (*list)->profiletype = 0;
331
332 // delete '\n' at the end of line
333 // format of stdin config line:
334 // <profilegroup>#<profilename>#<profiletype>#<channelname>#<channel_sourcelist>
335 p = strchr(line, '\n');
336 if ( p ) *p = '\0';
337 LogInfo("Process line '%s'\n", line);
338
339 q = line;
340 p = strchr(q, '#');
341 if ( p )
342 *p = '\0';
343
344 s = line;
345
346 // savety check: if no separator found loop to next line
347 if ( !p ) {
348 LogError("Incomplete line - channel skipped.\n");
349 continue;
350 }
351
352 q = p;
353 q++;
354
355 p = strchr(q, '#');
356 if ( p )
357 *p = '\0';
358
359 snprintf(path, MAXPATHLEN-1, "%s/%s/%s", profile_datadir, s, q);
360 path[MAXPATHLEN-1] = '\0';
361 if ( stat(path, &stat_buf) || !S_ISDIR(stat_buf.st_mode) ) {
362 LogError("profile '%s' not found in group %s. Skipped.\n", q, s);
363 continue;
364 }
365
366 (*list)->profilegroup = strdup(s);
367 (*list)->profilename = strdup(q);
368
369 // savety check: if no separator found loop to next line
370 if ( !p ) {
371 LogError("Incomplete line - channel skipped.\n");
372 continue;
373 }
374
375 q = p;
376 q++;
377
378 p = strchr(q, '#');
379 if ( p )
380 *p = '\0';
381
382 s = q;
383 while ( *s ) {
384 if ( *s < '0' || *s > '9' ) {
385 LogError("Not a valid number: %s\n", q);
386 s = NULL;
387 break;
388 }
389 s++;
390 }
391 if ( s == NULL )
392 continue;
393
394 (*list)->profiletype = (int)strtol(q, (char **)NULL, 10);
395
396 // savety check: if no separator found loop to next line
397 if ( !p ) {
398 LogError("Incomplete line - channel skipped.\n");
399 continue;
400 }
401
402 q = p;
403 q++;
404
405 p = strchr(q, '#');
406 if ( p )
407 *p = '\0';
408
409 snprintf(path, MAXPATHLEN-1, "%s/%s/%s/%s", profile_datadir, (*list)->profilegroup, (*list)->profilename, q);
410 path[MAXPATHLEN-1] = '\0';
411 if ( stat(path, &stat_buf) || !S_ISDIR(stat_buf.st_mode) ) {
412 LogError("channel '%s' in profile '%s' not found. Skipped.\n", q, (*list)->profilename);
413 continue;
414 }
415
416 (*list)->channelname = strdup(q);
417
418 if ( !p ) {
419 LogError("Incomplete line - Skipped.\n");
420 continue;
421 }
422
423 q = p;
424 q++;
425
426 p = strchr(q, '#');
427 if ( p )
428 *p = '\0';
429
430 // Skip leading '| chars
431 while ( *q && *q == '|' ) {
432 q++;
433 }
434 s = q;
435
436 // if q is already empty ( '\0' ) loop is not processed
437 while ( *s ) {
438 // as s[0] is not '\0' s[1] may be '\0' but still valid and in range
439 if ( s[0] == '|' && s[1] == '|' ) {
440 char *t = s;
441 t++;
442 while ( *t ) { // delete this empty channel name
443 t[0] = t[1];
444 t++;
445 }
446 } else
447 s++;
448 }
449 // we have no doublicate '|' here any more
450 // check if last char is an extra '|'
451 if ( *q && (q[strlen(q)-1] == '|') )
452 q[strlen(q)-1] = '\0';
453
454 if ( *q && (strcmp(q, "*") != 0) )
455 (*list)->channel_sourcelist = strdup(q);
456
457 list = &((*list)->next);
458 }
459
460 if ( *list != NULL ) {
461 free(*list);
462 *list = NULL;
463 }
464
465 if ( ferror(stdin) ) {
466 LogError("fgets() error: %s", strerror(errno));
467 return NULL;
468 }
469
470 return profile_list;
471
472 } // End of ParseParams
473
main(int argc,char ** argv)474 int main( int argc, char **argv ) {
475 unsigned int num_channels, compress;
476 struct stat stat_buf;
477 profile_param_info_t *profile_list;
478 char *rfile, *ffile, *filename, *Mdirs;
479 char *profile_datadir, *profile_statdir, *nameserver;
480 int c, syntax_only, subdir_index, stdin_profile_params;
481 time_t tslot;
482
483 profile_datadir = NULL;
484 profile_statdir = NULL;
485 Mdirs = NULL;
486 tslot = 0;
487 syntax_only = 0;
488 compress = NOT_COMPRESSED;
489 subdir_index = 0;
490 profile_list = NULL;
491 nameserver = NULL;
492 stdin_profile_params = 0;
493 is_anonymized = 0;
494
495 strncpy(Ident, "none", IDENTLEN);
496 Ident[IDENTLEN-1] = '\0';
497
498 // default file names
499 ffile = "filter.txt";
500 rfile = NULL;
501 while ((c = getopt(argc, argv, "D:HIL:p:P:hi:f:J;r:n:M:S:t:VzZ")) != EOF) {
502 switch (c) {
503 case 'h':
504 usage(argv[0]);
505 exit(0);
506 break;
507 case 'D':
508 nameserver = optarg;
509 if ( !set_nameserver(nameserver) ) {
510 exit(255);
511 }
512 break;
513 case 'I':
514 stdin_profile_params = 1;
515 break;
516 case 'L':
517 if ( !InitLog(0, "nfprofile", optarg, 0) )
518 exit(255);
519 break;
520 case 'Z':
521 syntax_only = 1;
522 break;
523 case 'p':
524 profile_datadir = optarg;
525 break;
526 case 'P':
527 profile_statdir = optarg;
528 break;
529 case 'S':
530 subdir_index = atoi(optarg);
531 break;
532 case 'V':
533 printf("%s: Version: %s\n",argv[0], nfdump_version);
534 exit(0);
535 break;
536 case 'f':
537 ffile = optarg;
538 break;
539 case 't':
540 tslot = atoi(optarg);
541 break;
542 case 'M':
543 Mdirs = optarg;
544 break;
545 case 'r':
546 rfile = optarg;
547 break;
548 case 'j':
549 if ( compress ) {
550 LogError("Use one compression: -z for LZO, -j for BZ2 or -y for LZ4 compression\n");
551 exit(255);
552 }
553 compress = BZ2_COMPRESSED;
554 break;
555 case 'y':
556 if ( compress ) {
557 LogError("Use one compression: -z for LZO, -j for BZ2 or -y for LZ4 compression\n");
558 exit(255);
559 }
560 compress = LZ4_COMPRESSED;
561 break;
562 case 'z':
563 if ( compress ) {
564 LogError("Use one compression: -z for LZO, -j for BZ2 or -y for LZ4 compression\n");
565 exit(255);
566 }
567 compress = LZO_COMPRESSED;
568 break;
569 #ifdef HAVE_INFLUXDB
570 case 'i':
571 if ( optarg != NULL )
572 strncpy(influxdb_url, optarg, 1024);
573 else {
574 LogError("Missing argument for -i <influx URL>\n");
575 exit(255);
576 }
577 influxdb_url[1023] = '\0';
578 break;
579 #endif
580 default:
581 usage(argv[0]);
582 exit(0);
583 }
584 }
585
586 if ( subdir_index && !InitHierPath(subdir_index) ) {
587 exit(255);
588 }
589
590 if ( !profile_datadir ) {
591 LogError("Profile data directory required!\n");
592 exit(255);
593 }
594
595 if ( !profile_statdir ) {
596 profile_statdir = profile_datadir;
597 }
598
599 if ( stat(profile_datadir, &stat_buf) || !S_ISDIR(stat_buf.st_mode) ) {
600 LogError("'%s' not a directory\n", profile_datadir);
601 exit(255);
602 }
603
604 if ( stdin_profile_params ) {
605 profile_list = ParseParams(profile_datadir);
606 if ( !profile_list ) {
607 exit(254);
608 }
609 }
610
611 if ( syntax_only ) {
612 filename = NULL;
613 rfile = NULL;
614 } else {
615 char *p;
616 if ( rfile == NULL ) {
617 LogError("-r filename required!\n");
618 exit(255);
619 }
620 p = strrchr(rfile, '/');
621 filename = p == NULL ? rfile : ++p;
622 if ( strlen(filename) == 0 ) {
623 LogError("Filename error: zero length filename\n");
624 exit(254);
625 }
626 }
627
628 if ( chdir(profile_datadir)) {
629 LogError("Error can't chdir to '%s': %s", profile_datadir, strerror(errno));
630 exit(255);
631 }
632
633 num_channels = InitChannels(profile_datadir, profile_statdir, profile_list, ffile, filename, subdir_index, syntax_only, compress);
634
635 // nothing to do
636 if ( num_channels == 0 ) {
637 LogInfo("No channels to process.\n");
638 return 0;
639 }
640
641 if ( syntax_only ) {
642 printf("Syntax check done.\n");
643 return 0;
644 }
645
646 if ( !rfile ) {
647 LogError("Input file (-r) required!\n");
648 exit(255);
649 }
650
651 extension_map_list = InitExtensionMaps(NEEDS_EXTENSION_LIST);
652 if ( !InitExporterList() ) {
653 exit(255);
654 }
655
656 SetupInputFileSequence(Mdirs,rfile, NULL);
657
658 process_data(GetChannelInfoList(), num_channels, tslot);
659
660 CloseChannels(tslot, compress);
661
662 FreeExtensionMaps(extension_map_list);
663
664 return 0;
665 }
666