1 /*
2 * Copyright (c) 2009-2020, Peter Haag
3 * Copyright (c) 2004-2008, SWITCH - Teleinformatikdienste fuer Lehre und Forschung
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright notice,
12 * this list of conditions and the following disclaimer in the documentation
13 * and/or other materials provided with the distribution.
14 * * Neither the name of the author nor the names of its contributors may be
15 * used to endorse or promote products derived from this software without
16 * specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 *
30 */
31
32 #include "config.h"
33
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <stdarg.h>
37 #include <string.h>
38 #include <errno.h>
39 #include <sys/types.h>
40 #include <sys/socket.h>
41 #include <time.h>
42 #include <sys/time.h>
43 #include <netdb.h>
44 #include <pwd.h>
45 #include <grp.h>
46 #include <unistd.h>
47 #include <sys/wait.h>
48 #include <sys/stat.h>
49 #include <sys/param.h>
50 #include <netinet/in.h>
51 #include <arpa/inet.h>
52 #include <fcntl.h>
53 #include <signal.h>
54 #include <sys/mman.h>
55 #include <string.h>
56 #include <dirent.h>
57
58 #ifdef PCAP
59 #include "pcap_reader.h"
60 #endif
61
62 #ifdef HAVE_STDINT_H
63 #include <stdint.h>
64 #endif
65
66 #include "util.h"
67 #include "nfdump.h"
68 #include "nffile.h"
69 #include "nfx.h"
70 #include "exporter.h"
71 #include "nfnet.h"
72 #include "flist.h"
73 #include "nfstatfile.h"
74 #include "bookkeeper.h"
75 #include "launch.h"
76 #include "collector.h"
77 #include "netflow_v1.h"
78 #include "netflow_v5_v7.h"
79 #include "netflow_v9.h"
80 #include "ipfix.h"
81
82 #ifdef HAVE_FTS_H
83 # include <fts.h>
84 #else
85 # include "fts_compat.h"
86 #define fts_children fts_children_compat
87 #define fts_close fts_close_compat
88 #define fts_open fts_open_compat
89 #define fts_read fts_read_compat
90 #define fts_set fts_set_compat
91 #endif
92
93 #include "expire.h"
94
95 #define DEFAULTCISCOPORT "9995"
96 #define DEFAULTHOSTNAME "127.0.0.1"
97 #define SENDSOCK_BUFFSIZE 200000
98
99 static void *shmem = NULL;
100 static int verbose = 0;
101 static uint32_t default_sampling = 1;
102 static uint32_t overwrite_sampling = 0;
103
104 extern uint32_t default_sampling; // the default sampling rate when nothing else applies. set by -S
105 extern uint32_t overwrite_sampling; // unconditionally overwrite sampling rate with given sampling rate -S
106
107 // Define a generic type to get data from socket or pcap file
108 typedef ssize_t (*packet_function_t)(int, void *, size_t, int, struct sockaddr *, socklen_t *);
109
110 /* module limited globals */
111 static FlowSource_t *FlowSource;
112
113 static int done, launcher_alive, periodic_trigger, launcher_pid;
114
115 static const char *nfdump_version = VERSION;
116
117
118 /* Local function Prototypes */
119 static void usage(char *name);
120
121 static void kill_launcher(int pid);
122
123 static void IntHandler(int signal);
124
125 static inline FlowSource_t *GetFlowSource(struct sockaddr_storage *ss);
126
127 static void daemonize(void);
128
129 static void SetPriv(char *userid, char *groupid );
130
131 static void run(packet_function_t receive_packet, int socket, repeater_t *repeater,
132 time_t twin, time_t t_begin, int report_seq, int use_subdirs, char *time_extension, int compress);
133
134 /* Functions */
usage(char * name)135 static void usage(char *name) {
136 printf("usage %s [options] \n"
137 "-h\t\tthis text you see right here\n"
138 "-u userid\tChange user to username\n"
139 "-g groupid\tChange group to groupname\n"
140 "-w\t\tSync file rotation with next 5min (default) interval\n"
141 "-t interval\tset the interval to rotate nfcapd files\n"
142 "-b host\t\tbind socket to host/IP addr\n"
143 "-J mcastgroup\tJoin multicast group <mcastgroup>\n"
144 "-p portnum\tlisten on port portnum\n"
145 "-l basdir \tset the output directory. (no default) \n"
146 "-S subdir\tSub directory format. see nfcapd(1) for format\n"
147 "-I Ident\tset the ident string for stat file. (default 'none')\n"
148 "-H Add port histogram data to flow file.(default 'no')\n"
149 "-n Ident,IP,logdir\tAdd this flow source - multiple streams\n"
150 "-M dir \t\tSet the output directory for dynamic sources.\n"
151
152 "-P pidfile\tset the PID file\n"
153 "-R IP[/port]\tRepeat incoming packets to IP address/port. Max 8 repeaters.\n"
154 "-s rate\tset default sampling rate (default 1)\n"
155 "-x process\tlaunch process after a new file becomes available\n"
156 "-z\t\tLZO compress flows in output file.\n"
157 "-y\t\tLZ4 compress flows in output file.\n"
158 "-j\t\tBZ2 compress flows in output file.\n"
159 "-B bufflen\tSet socket buffer to bufflen bytes\n"
160 "-e\t\tExpire data at each cycle.\n"
161 "-D\t\tFork to background\n"
162 "-E\t\tPrint extended format of netflow data. For debugging purpose only.\n"
163 "-T\t\tInclude extension tags in records.\n"
164 "-4\t\tListen on IPv4 (default).\n"
165 "-6\t\tListen on IPv6.\n"
166 "-V\t\tPrint version and exit.\n"
167 "-Z\t\tAdd timezone offset to filename.\n"
168 , name);
169 } // End of usage
170
kill_launcher(int pid)171 void kill_launcher(int pid) {
172 int stat, i;
173 pid_t ret;
174
175 if ( pid == 0 )
176 return;
177
178 if ( launcher_alive ) {
179 LogInfo("Signal launcher[%i] to terminate.", pid);
180 kill(pid, SIGTERM);
181
182 // wait for launcher to teminate
183 for ( i=0; i<LAUNCHER_TIMEOUT; i++ ) {
184 if ( !launcher_alive )
185 break;
186 sleep(1);
187 }
188 if ( i >= LAUNCHER_TIMEOUT ) {
189 LogError("Launcher does not want to terminate - signal again");
190 kill(pid, SIGTERM);
191 sleep(1);
192 }
193 } else {
194 LogError("launcher[%i] already dead.", pid);
195 }
196
197 if ( (ret = waitpid (pid, &stat, 0)) == -1 ) {
198 LogError("wait for launcher failed: %s %i", strerror(errno), ret);
199 } else {
200 if ( WIFEXITED(stat) ) {
201 LogInfo("launcher exit status: %i", WEXITSTATUS(stat));
202 }
203 if ( WIFSIGNALED(stat) ) {
204 LogError("launcher terminated due to signal %i", WTERMSIG(stat));
205 }
206 }
207
208 } // End of kill_launcher
209
IntHandler(int signal)210 static void IntHandler(int signal) {
211
212 switch (signal) {
213 case SIGALRM:
214 periodic_trigger = 1;
215 break;
216 case SIGHUP:
217 case SIGINT:
218 case SIGTERM:
219 done = 1;
220 break;
221 case SIGCHLD:
222 launcher_alive = 0;
223 break;
224 default:
225 // ignore everything we don't know
226 break;
227 }
228
229 } /* End of IntHandler */
230
231
daemonize(void)232 static void daemonize(void) {
233 int fd;
234 switch (fork()) {
235 case 0:
236 // child
237 break;
238 case -1:
239 // error
240 fprintf(stderr, "fork() error: %s\n", strerror(errno));
241 exit(0);
242 break;
243 default:
244 // parent
245 _exit(0);
246 }
247
248 if (setsid() < 0) {
249 fprintf(stderr, "setsid() error: %s\n", strerror(errno));
250 exit(0);
251 }
252
253 // Double fork
254 switch (fork()) {
255 case 0:
256 // child
257 break;
258 case -1:
259 // error
260 fprintf(stderr, "fork() error: %s\n", strerror(errno));
261 exit(0);
262 break;
263 default:
264 // parent
265 _exit(0);
266 }
267
268 fd = open("/dev/null", O_RDONLY);
269 if (fd != 0) {
270 dup2(fd, 0);
271 close(fd);
272 }
273 fd = open("/dev/null", O_WRONLY);
274 if (fd != 1) {
275 dup2(fd, 1);
276 close(fd);
277 }
278 fd = open("/dev/null", O_WRONLY);
279 if (fd != 2) {
280 dup2(fd, 2);
281 close(fd);
282 }
283
284 } // End of daemonize
285
SetPriv(char * userid,char * groupid)286 static void SetPriv(char *userid, char *groupid ) {
287 struct passwd *pw_entry;
288 struct group *gr_entry;
289 uid_t myuid, newuid, newgid;
290 int err;
291
292 if ( userid == 0 && groupid == 0 )
293 return;
294
295 newuid = newgid = 0;
296 myuid = getuid();
297 if ( myuid != 0 ) {
298 LogError("Only root wants to change uid/gid");
299 fprintf(stderr, "ERROR: Only root wants to change uid/gid\n");
300 exit(255);
301 }
302
303 if ( userid ) {
304 pw_entry = getpwnam(userid);
305 newuid = pw_entry ? pw_entry->pw_uid : atol(userid);
306
307 if ( newuid == 0 ) {
308 fprintf (stderr,"Invalid user '%s'\n", userid);
309 exit(255);
310 }
311 }
312
313 if ( groupid ) {
314 gr_entry = getgrnam(groupid);
315 newgid = gr_entry ? gr_entry->gr_gid : atol(groupid);
316
317 if ( newgid == 0 ) {
318 fprintf (stderr,"Invalid group '%s'\n", groupid);
319 exit(255);
320 }
321
322 err = setgid(newgid);
323 if ( err ) {
324 LogError("Can't set group id %ld for group '%s': %s", (long)newgid, groupid, strerror(errno));
325 fprintf (stderr,"Can't set group id %ld for group '%s': %s\n", (long)newgid, groupid, strerror(errno));
326 exit(255);
327 }
328
329 }
330
331 if ( newuid ) {
332 err = setuid(newuid);
333 if ( err ) {
334 LogError("Can't set user id %ld for user '%s': %s", (long)newuid, userid, strerror(errno));
335 fprintf (stderr,"Can't set user id %ld for user '%s': %s\n", (long)newuid, userid, strerror(errno));
336 exit(255);
337 }
338 }
339
340 } // End of SetPriv
341
format_file_block_header(data_block_header_t * header)342 static void format_file_block_header(data_block_header_t *header) {
343
344 printf("\n"
345 "File Block Header: \n"
346 " NumBlocks = %10u\n"
347 " Size = %10u\n"
348 " id = %10u\n",
349 header->NumRecords,
350 header->size,
351 header->id);
352
353 } // End of format_file_block_header
354
355 #include "nffile_inline.c"
356 #include "collector_inline.c"
357
run(packet_function_t receive_packet,int socket,repeater_t * repeater,time_t twin,time_t t_begin,int report_seq,int use_subdirs,char * time_extension,int compress)358 static void run(packet_function_t receive_packet, int socket, repeater_t *repeater,
359 time_t twin, time_t t_begin, int report_seq, int use_subdirs, char *time_extension, int compress) {
360 common_flow_header_t *nf_header;
361 FlowSource_t *fs;
362 struct sockaddr_storage nf_sender;
363 socklen_t nf_sender_size = sizeof(nf_sender);
364 time_t t_start, t_now;
365 uint64_t export_packets;
366 uint32_t blast_cnt, blast_failures, ignored_packets;
367 uint16_t version;
368 ssize_t cnt;
369 void *in_buff;
370 int err;
371 srecord_t *commbuff;
372
373 if ( !Init_v1(verbose) || !Init_v5_v7_input(verbose, default_sampling, overwrite_sampling) ||
374 !Init_v9(verbose, default_sampling, overwrite_sampling) || !Init_IPFIX(verbose, default_sampling, overwrite_sampling) )
375 return;
376
377 in_buff = malloc(NETWORK_INPUT_BUFF_SIZE);
378 if ( !in_buff ) {
379 LogError("malloc() allocation error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno) );
380 return;
381 }
382
383 // init vars
384 commbuff = (srecord_t *)shmem;
385 nf_header = (common_flow_header_t *)in_buff;
386
387 // Init each netflow source output data buffer
388 fs = FlowSource;
389 while ( fs ) {
390
391 // prepare file
392 fs->nffile = OpenNewFile(fs->current, NULL, compress, 0, NULL);
393 if ( !fs->nffile ) {
394 return;
395 }
396 // init vars
397 fs->bad_packets = 0;
398 fs->first_seen = 0xffffffffffffLL;
399 fs->last_seen = 0;
400
401 // next source
402 fs = fs->next;
403 }
404
405 export_packets = blast_cnt = blast_failures = 0;
406 t_start = t_begin;
407
408 cnt = 0;
409 periodic_trigger = 0;
410 ignored_packets = 0;
411
412 // wake up at least at next time slot (twin) + 1s
413 alarm(t_start + twin + 1 - time(NULL));
414 /*
415 * Main processing loop:
416 * this loop, continues until done = 1, set by the signal handler
417 * The while loop will be breaked by the periodic file renaming code
418 * for proper cleanup
419 */
420 while ( 1 ) {
421 struct timeval tv;
422 int i;
423
424 /* read next bunch of data into beginn of input buffer */
425 if ( !done) {
426 #ifdef PCAP
427 // Debug code to read from pcap file, or from socket
428 cnt = receive_packet(socket, in_buff, NETWORK_INPUT_BUFF_SIZE , 0,
429 (struct sockaddr *)&nf_sender, &nf_sender_size);
430
431 // in case of reading from file EOF => -2
432 if ( cnt == -2 )
433 done = 1;
434 #else
435 cnt = recvfrom (socket, in_buff, NETWORK_INPUT_BUFF_SIZE , 0,
436 (struct sockaddr *)&nf_sender, &nf_sender_size);
437 #endif
438
439 if ( cnt == -1 && errno != EINTR ) {
440 LogError("ERROR: recvfrom: %s", strerror(errno));
441 continue;
442 }
443
444 i = 0;
445 while ( repeater[i].hostname && (i < MAX_REPEATERS)) {
446 ssize_t len;
447 len = sendto(repeater[i].sockfd, in_buff, cnt, 0,
448 (struct sockaddr *)&(repeater[i].addr), repeater[i].addrlen);
449 if ( len < 0 ) {
450 LogError("ERROR: sendto(): %s", strerror(errno));
451 }
452 i++;
453 }
454 }
455
456 /* Periodic file renaming, if time limit reached or if we are done. */
457 // t_now = time(NULL);
458 gettimeofday(&tv, NULL);
459 t_now = tv.tv_sec;
460
461 if ( ((t_now - t_start) >= twin) || done ) {
462 struct tm *now;
463 char *subdir, fmt[MAXTIMESTRING];
464
465 alarm(0);
466 now = localtime(&t_start);
467 strftime(fmt, sizeof(fmt), time_extension, now);
468
469 // prepare sub dir hierarchy
470 if ( use_subdirs ) {
471 subdir = GetSubDir(now);
472 if ( !subdir ) {
473 // failed to generate subdir path - put flows into base directory
474 LogError("Failed to create subdir path!");
475
476 // failed to generate subdir path - put flows into base directory
477 subdir = NULL;
478 }
479 } else {
480 subdir = NULL;
481 }
482
483 // for each flow source update the stats, close the file and re-initialize the new file
484 fs = FlowSource;
485 while ( fs ) {
486 char nfcapd_filename[MAXPATHLEN];
487 char error[255];
488 nffile_t *nffile = fs->nffile;
489
490 if ( verbose ) {
491 // Dump to stdout
492 format_file_block_header(nffile->block_header);
493 }
494
495 if ( nffile->block_header->NumRecords ) {
496 // flush current buffer to disc
497 if ( WriteBlock(nffile) <= 0 )
498 LogError("Ident: %s, failed to write output buffer to disk: '%s'" ,
499 fs->Ident, strerror(errno));
500 } // else - no new records in current block
501
502
503 // prepare filename
504 if ( subdir ) {
505 if ( SetupSubDir(fs->datadir, subdir, error, 255) ) {
506 snprintf(nfcapd_filename, MAXPATHLEN-1, "%s/%s/nfcapd.%s", fs->datadir, subdir, fmt);
507 } else {
508 LogError("Ident: %s, Failed to create sub hier directories: %s", fs->Ident, error );
509 // skip subdir - put flows directly into current directory
510 snprintf(nfcapd_filename, MAXPATHLEN-1, "%s/nfcapd.%s", fs->datadir, fmt);
511 }
512 } else {
513 snprintf(nfcapd_filename, MAXPATHLEN-1, "%s/nfcapd.%s", fs->datadir, fmt);
514 }
515 nfcapd_filename[MAXPATHLEN-1] = '\0';
516
517 // update stat record
518 // if no flows were collected, fs->last_seen is still 0
519 // set first_seen to start of this time slot, with twin window size.
520 if ( fs->last_seen == 0 ) {
521 fs->first_seen = (uint64_t)1000 * (uint64_t)t_start;
522 fs->last_seen = (uint64_t)1000 * (uint64_t)(t_start + twin);
523 }
524 nffile->stat_record->first_seen = fs->first_seen/1000;
525 nffile->stat_record->msec_first = fs->first_seen - nffile->stat_record->first_seen*1000;
526 nffile->stat_record->last_seen = fs->last_seen/1000;
527 nffile->stat_record->msec_last = fs->last_seen - nffile->stat_record->last_seen*1000;
528
529 // Flush Exporter Stat to file
530 FlushExporterStats(fs);
531 // Close file
532 CloseUpdateFile(nffile, fs->Ident);
533
534 // if rename fails, we are in big trouble, as we need to get rid of the old .current file
535 // otherwise, we will loose flows and can not continue collecting new flows
536 err = rename(fs->current, nfcapd_filename);
537 if ( err ) {
538 LogError("Ident: %s, Can't rename dump file: %s", fs->Ident, strerror(errno));
539 LogError("Ident: %s, Serious Problem! Fix manually", fs->Ident);
540 if ( launcher_pid )
541 commbuff->failed = 1;
542
543 // we do not update the books here, as the file failed to rename properly
544 // otherwise the books may be wrong
545 } else {
546 struct stat fstat;
547 if ( launcher_pid )
548 commbuff->failed = 0;
549
550 // Update books
551 stat(nfcapd_filename, &fstat);
552 UpdateBooks(fs->bookkeeper, t_start, 512*fstat.st_blocks);
553 }
554
555 // log stats
556 LogInfo("Ident: '%s' Flows: %llu, Packets: %llu, Bytes: %llu, Sequence Errors: %u, Bad Packets: %u",
557 fs->Ident, (unsigned long long)nffile->stat_record->numflows,
558 (unsigned long long)nffile->stat_record->numpackets,
559 (unsigned long long)nffile->stat_record->numbytes, nffile->stat_record->sequence_failure, fs->bad_packets);
560
561 // reset stats
562 fs->bad_packets = 0;
563 fs->first_seen = 0xffffffffffffLL;
564 fs->last_seen = 0;
565
566 if ( !done ) {
567 nffile = OpenNewFile(fs->current, nffile, compress, 0, NULL);
568 if ( !nffile ) {
569 LogError("killed due to fatal error: ident: %s", fs->Ident);
570 break;
571 }
572 }
573
574 // Dump all extension maps and exporters to the buffer
575 FlushStdRecords(fs);
576
577 // next flow source
578 fs = fs->next;
579
580 } // end of while (fs)
581
582 // trigger launcher if required
583 if ( launcher_pid ) {
584 // Signal launcher
585
586 strncpy(commbuff->tstring, fmt, MAXTIMESTRING);
587 commbuff->tstring[MAXTIMESTRING-1] = '\0';
588
589 commbuff->tstamp = t_start;
590 if ( subdir ) {
591 snprintf(commbuff->fname, MAXPATHLEN-1, "%s/nfcapd.%s", subdir, fmt);
592 } else {
593 snprintf(commbuff->fname, MAXPATHLEN-1, "nfcapd.%s", fmt);
594 }
595 commbuff->fname[MAXPATHLEN-1] = '\0';
596
597 if ( launcher_alive ) {
598 LogInfo("Signal launcher");
599 kill(launcher_pid, SIGHUP);
600 } else
601 LogError("ERROR: Launcher died unexpectedly!");
602
603 }
604
605 LogInfo("Total ignored packets: %u", ignored_packets);
606 ignored_packets = 0;
607
608 if ( done )
609 break;
610
611 // update alarm for next cycle
612 t_start += twin;
613 /* t_start = filename time stamp: begin of slot
614 * + twin = end of next time interval
615 * + 1 = act at least 1s after time window expired
616 * - t_now = difference value to now
617 */
618 alarm(t_start + twin + 1 - t_now);
619
620 }
621
622 /* check for error condition or done . errno may only be EINTR */
623 if ( cnt < 0 ) {
624 if ( periodic_trigger ) {
625 // alarm triggered, no new flow data
626 periodic_trigger = 0;
627 continue;
628 }
629 if ( done )
630 // signaled to terminate - exit from loop
631 break;
632 else {
633 /* this should never be executed as it should be caught in other places */
634 LogError("error condition in '%s', line '%d', cnt: %i", __FILE__, __LINE__ ,(int)cnt);
635 continue;
636 }
637 }
638
639 /* enough data? */
640 if ( cnt == 0 )
641 continue;
642
643 // get flow source record for current packet, identified by sender IP address
644 fs = GetFlowSource(&nf_sender);
645 if ( fs == NULL ) {
646 fs = AddDynamicSource(&FlowSource, &nf_sender);
647 if ( fs == NULL ) {
648 LogError("Skip UDP packet. Ignored packets so far %u packets", ignored_packets);
649 ignored_packets++;
650 continue;
651 }
652 if ( InitBookkeeper(&fs->bookkeeper, fs->datadir, getpid(), launcher_pid) != BOOKKEEPER_OK ) {
653 LogError("Failed to initialise bookkeeper for new source");
654 // fatal error
655 return;
656 }
657 fs->nffile = OpenNewFile(fs->current, NULL, compress, 0, NULL);
658 if ( !fs->nffile ) {
659 LogError("Failed to open new collector file");
660 return;
661 }
662 }
663
664 /* check for too little data - cnt must be > 0 at this point */
665 if ( cnt < sizeof(common_flow_header_t) ) {
666 LogError("Ident: %s, Data length error: too little data for common netflow header. cnt: %i",fs->Ident, (int)cnt);
667 fs->bad_packets++;
668 continue;
669 }
670
671 fs->received = tv;
672 /* Process data - have a look at the common header */
673 version = ntohs(nf_header->version);
674 switch (version) {
675 case 1:
676 Process_v1(in_buff, cnt, fs);
677 break;
678 case 5: // fall through
679 case 7:
680 Process_v5_v7(in_buff, cnt, fs);
681 break;
682 case 9:
683 Process_v9(in_buff, cnt, fs);
684 break;
685 case 10:
686 Process_IPFIX(in_buff, cnt, fs);
687 break;
688 case 255:
689 // blast test header
690 if ( verbose ) {
691 uint16_t count = ntohs(nf_header->count);
692 if ( blast_cnt != count ) {
693 // LogError("Mismatch blast check: Expected %u got %u\n", blast_cnt, count);
694 blast_cnt = count;
695 blast_failures++;
696 } else {
697 blast_cnt++;
698 }
699 if ( blast_cnt == 65535 ) {
700 fprintf(stderr, "Total missed packets: %u\n", blast_failures);
701 done = 1;
702 }
703 break;
704 }
705 default:
706 // data error, while reading data from socket
707 LogError("Ident: %s, Error reading netflow header: Unexpected netflow version %i", fs->Ident, version);
708 fs->bad_packets++;
709 continue;
710
711 // not reached
712 break;
713 }
714 // each Process_xx function has to process the entire input buffer, therefore it's empty now.
715 export_packets++;
716
717 // flush current buffer to disc
718 if ( fs->nffile->block_header->size > BUFFSIZE ) {
719 // fishy! - we already wrote into someone elses memory! - I'm sorry
720 // reset output buffer - data may be lost, as we don not know, where it happen
721 fs->nffile->block_header->size = 0;
722 fs->nffile->block_header->NumRecords = 0;
723 fs->nffile->buff_ptr = (void *)((pointer_addr_t)fs->nffile->block_header + sizeof(data_block_header_t) );
724 LogError("### Software bug ### Ident: %s, output buffer overflow: expect memory inconsitency", fs->Ident);
725 }
726 }
727
728 if ( verbose && blast_failures ) {
729 fprintf(stderr, "Total missed packets: %u\n", blast_failures);
730 }
731 free(in_buff);
732
733 fs = FlowSource;
734 while ( fs ) {
735 DisposeFile(fs->nffile);
736 fs = fs->next;
737 }
738
739 } /* End of run */
740
main(int argc,char ** argv)741 int main(int argc, char **argv) {
742
743 char *bindhost, *datadir, pidstr[32], *launch_process;
744 char *userid, *groupid, *checkptr, *listenport, *mcastgroup, *extension_tags;
745 char *Ident, *dynsrcdir, *time_extension, pidfile[MAXPATHLEN];
746 struct stat fstat;
747 packet_function_t receive_packet;
748 repeater_t repeater[MAX_REPEATERS];
749 FlowSource_t *fs;
750 struct sigaction act;
751 int family, bufflen;
752 time_t twin, t_start;
753 int sock, do_daemonize, expire, spec_time_extension, report_sequence;
754 int subdir_index, sampling_rate, compress;
755 int c, i;
756 #ifdef PCAP
757 char *pcap_file = NULL;
758 #endif
759
760 receive_packet = recvfrom;
761 verbose = do_daemonize = 0;
762 bufflen = 0;
763 family = AF_UNSPEC;
764 launcher_pid = 0;
765 launcher_alive = 0;
766 report_sequence = 0;
767 listenport = DEFAULTCISCOPORT;
768 bindhost = NULL;
769 mcastgroup = NULL;
770 pidfile[0] = 0;
771 launch_process = NULL;
772 userid = groupid = NULL;
773 twin = TIME_WINDOW;
774 datadir = NULL;
775 subdir_index = 0;
776 time_extension = "%Y%m%d%H%M";
777 spec_time_extension = 0;
778 expire = 0;
779 sampling_rate = 1;
780 compress = NOT_COMPRESSED;
781 memset((void *)&repeater, 0, sizeof(repeater));
782 for ( i = 0; i < MAX_REPEATERS; i++ ) {
783 repeater[i].family = AF_UNSPEC;
784 }
785 Ident = "none";
786 FlowSource = NULL;
787 extension_tags = DefaultExtensions;
788 dynsrcdir = NULL;
789
790 while ((c = getopt(argc, argv, "46ef:whEVI:DB:b:jl:J:M:n:p:P:R:S:s:T:t:x:Xru:g:yzZ")) != EOF) {
791 switch (c) {
792 case 'h':
793 usage(argv[0]);
794 exit(0);
795 break;
796 case 'u':
797 userid = optarg;
798 break;
799 case 'g':
800 groupid = optarg;
801 break;
802 case 'e':
803 expire = 1;
804 break;
805 case 'f': {
806 #ifdef PCAP
807 struct stat fstat;
808 pcap_file = optarg;
809 stat(pcap_file, &fstat);
810 if ( !S_ISREG(fstat.st_mode) ) {
811 fprintf(stderr, "Not a regular file: %s\n", pcap_file);
812 exit(254);
813 }
814 #else
815 fprintf(stderr, "PCAP reader not compiled! Option ignored!\n");
816 #endif
817 } break;
818 case 'E':
819 verbose = 1;
820 Setv6Mode(1);
821 break;
822 case 'V':
823 printf("%s: Version: %s\n",argv[0], nfdump_version);
824 exit(0);
825 break;
826 case 'D':
827 do_daemonize = 1;
828 break;
829 case 'I':
830 Ident = strdup(optarg);
831 break;
832 case 'M':
833 dynsrcdir = strdup(optarg);
834 if ( strlen(dynsrcdir) > MAXPATHLEN ) {
835 fprintf(stderr, "ERROR: Path too long!\n");
836 exit(255);
837 }
838 if ( stat(dynsrcdir, &fstat) < 0 ) {
839 fprintf(stderr, "stat() failed on %s: %s\n", dynsrcdir, strerror(errno));
840 exit(255);
841 }
842 if ( !(fstat.st_mode & S_IFDIR) ) {
843 fprintf(stderr, "No such directory: %s\n", dynsrcdir);
844 break;
845 }
846 if ( !SetDynamicSourcesDir(&FlowSource, dynsrcdir) ) {
847 fprintf(stderr, "-l, -M and -n are mutually exclusive\n");
848 break;
849 }
850 break;
851 case 'n':
852 if ( AddFlowSource(&FlowSource, optarg) != 1 )
853 exit(255);
854 break;
855 case 'w':
856 // allow for compatibility - always sync timeslot
857 break;
858 case 'B':
859 bufflen = strtol(optarg, &checkptr, 10);
860 if ( (checkptr != NULL && *checkptr == 0) && bufflen > 0 )
861 break;
862 fprintf(stderr,"Argument error for -B\n");
863 exit(255);
864 case 'b':
865 bindhost = optarg;
866 break;
867 case 'J':
868 mcastgroup = optarg;
869 break;
870 case 'p':
871 listenport = optarg;
872 break;
873 case 'P':
874 if ( optarg[0] == '/' ) { // absolute path given
875 strncpy(pidfile, optarg, MAXPATHLEN-1);
876 } else { // path relative to current working directory
877 char tmp[MAXPATHLEN];
878 if ( !getcwd(tmp, MAXPATHLEN-1) ) {
879 fprintf(stderr, "Failed to get current working directory: %s\n", strerror(errno));
880 exit(255);
881 }
882 tmp[MAXPATHLEN-1] = 0;
883 if ( (strlen(tmp) + strlen(optarg) + 3) < MAXPATHLEN ) {
884 snprintf(pidfile, MAXPATHLEN - 3 - strlen(tmp), "%s/%s", tmp, optarg);
885 } else {
886 fprintf(stderr, "pidfile MAXPATHLEN error:\n");
887 exit(255);
888 }
889 }
890 // pidfile now absolute path
891 pidfile[MAXPATHLEN-1] = 0;
892 break;
893 case 'R': {
894 char *port, *hostname;
895 char *p = strchr(optarg, '/');
896 int i = 0;
897 if ( p ) {
898 *p++ = '\0';
899 port = strdup(p);
900 } else {
901 port = DEFAULTCISCOPORT;
902 }
903 hostname = strdup(optarg);
904 while ( repeater[i].hostname && (i < MAX_REPEATERS) ) i++;
905 if ( i == MAX_REPEATERS ) {
906 fprintf(stderr, "Too many packet repeaters! Max: %i repeaters allowed.\n", MAX_REPEATERS);
907 exit(255);
908 }
909 repeater[i].hostname = hostname;
910 repeater[i].port = port;
911
912 break; }
913 case 'r':
914 report_sequence = 1;
915 break;
916 case 's':
917 // a negative sampling rate is set as the overwrite sampling rate
918 sampling_rate = (int)strtol(optarg, (char **)NULL, 10);
919 if ( (sampling_rate == 0 ) ||
920 (sampling_rate < 0 && sampling_rate < -10000000) ||
921 (sampling_rate > 0 && sampling_rate > 10000000) ) {
922 fprintf(stderr, "Invalid sampling rate: %s\n", optarg);
923 exit(255);
924 }
925 break;
926 case 'T': {
927 size_t len = strlen(optarg);
928 extension_tags = optarg;
929 if ( len == 0 || len > 128 ) {
930 fprintf(stderr, "Extension length error. Unexpected option '%s'\n", extension_tags);
931 exit(255);
932 }
933 break; }
934 case 'l':
935 datadir = optarg;
936 if ( strlen(datadir) > MAXPATHLEN ) {
937 fprintf(stderr, "ERROR: Path too long!\n");
938 exit(255);
939 }
940 datadir = realpath(datadir, NULL);
941 if ( stat(datadir, &fstat) < 0 ) {
942 fprintf(stderr, "stat() failed on %s: %s\n", datadir, strerror(errno));
943 exit(255);
944 }
945 if ( !(fstat.st_mode & S_IFDIR) ) {
946 fprintf(stderr, "No such directory: %s\n", datadir);
947 break;
948 }
949 break;
950 case 'S':
951 subdir_index = atoi(optarg);
952 break;
953 case 't':
954 twin = atoi(optarg);
955 if ( twin < 2 ) {
956 LogError("time interval <= 2s not allowed");
957 exit(255);
958 }
959 if (twin < 60) {
960 time_extension = "%Y%m%d%H%M%S";
961 }
962 break;
963 case 'x':
964 launch_process = optarg;
965 break;
966 case 'j':
967 if ( compress ) {
968 LogError("Use one compression: -z for LZO, -j for BZ2 or -y for LZ4 compression\n");
969 exit(255);
970 }
971 compress = BZ2_COMPRESSED;
972 break;
973 case 'y':
974 if ( compress ) {
975 LogError("Use one compression: -z for LZO, -j for BZ2 or -y for LZ4 compression\n");
976 exit(255);
977 }
978 compress = LZ4_COMPRESSED;
979 break;
980 case 'z':
981 if ( compress ) {
982 LogError("Use one compression: -z for LZO, -j for BZ2 or -y for LZ4 compression\n");
983 exit(255);
984 }
985 compress = LZO_COMPRESSED;
986 break;
987 case 'Z':
988 time_extension = "%Y%m%d%H%M%z";
989 spec_time_extension = 1;
990 break;
991 case '4':
992 if ( family == AF_UNSPEC )
993 family = AF_INET;
994 else {
995 fprintf(stderr, "ERROR, Accepts only one protocol IPv4 or IPv6!\n");
996 exit(255);
997 }
998 break;
999 case '6':
1000 if ( family == AF_UNSPEC )
1001 family = AF_INET6;
1002 else {
1003 fprintf(stderr, "ERROR, Accepts only one protocol IPv4 or IPv6!\n");
1004 exit(255);
1005 }
1006 break;
1007 default:
1008 usage(argv[0]);
1009 exit(255);
1010 }
1011 }
1012
1013 if ( FlowSource == NULL && datadir == NULL && dynsrcdir == NULL ) {
1014 fprintf(stderr, "ERROR, Missing -n (-l/-I) or -M source definitions\n");
1015 exit(255);
1016 }
1017 if ( FlowSource == NULL && datadir != NULL && !AddDefaultFlowSource(&FlowSource, Ident, datadir) ) {
1018 fprintf(stderr, "Failed to add default data collector directory\n");
1019 exit(255);
1020 }
1021
1022 if ( bindhost && mcastgroup ) {
1023 fprintf(stderr, "ERROR, -b and -j are mutually exclusive!!\n");
1024 exit(255);
1025 }
1026
1027 if ( !InitLog(do_daemonize, argv[0], SYSLOG_FACILITY, verbose) ) {
1028 exit(255);
1029 }
1030
1031 if ( expire && spec_time_extension ) {
1032 fprintf(stderr, "ERROR, -Z timezone extension breaks expire -e\n");
1033 exit(255);
1034 }
1035
1036 InitExtensionMaps(NO_EXTENSION_LIST);
1037 SetupExtensionDescriptors(strdup(extension_tags));
1038
1039 // Debug code to read from pcap file
1040 #ifdef PCAP
1041 sock = 0;
1042 if ( pcap_file ) {
1043 printf("Setup pcap reader\n");
1044 setup_packethandler(pcap_file, NULL);
1045 receive_packet = NextPacket;
1046 } else
1047 #endif
1048 if ( mcastgroup )
1049 sock = Multicast_receive_socket (mcastgroup, listenport, family, bufflen);
1050 else
1051 sock = Unicast_receive_socket(bindhost, listenport, family, bufflen );
1052
1053 if ( sock == -1 ) {
1054 fprintf(stderr,"Terminated due to errors.\n");
1055 exit(255);
1056 }
1057
1058 i = 0;
1059 while ( repeater[i].hostname && (i < MAX_REPEATERS) ) {
1060 repeater[i].sockfd = Unicast_send_socket (repeater[i].hostname, repeater[i].port, repeater[i].family, bufflen,
1061 &repeater[i].addr, &repeater[i].addrlen );
1062 if ( repeater[i].sockfd <= 0 )
1063 exit(255);
1064 LogInfo("Replay flows to host: %s port: %s", repeater[i].hostname, repeater[i].port);
1065 i++;
1066 }
1067
1068 if ( sampling_rate < 0 ) {
1069 default_sampling = -sampling_rate;
1070 overwrite_sampling = default_sampling;
1071 } else {
1072 default_sampling = sampling_rate;
1073 }
1074
1075 SetPriv(userid, groupid);
1076
1077 if ( subdir_index && !InitHierPath(subdir_index) ) {
1078 close(sock);
1079 exit(255);
1080 }
1081
1082 // check if pid file exists and if so, if a process with registered pid is running
1083 if ( strlen(pidfile) ) {
1084 int pidf;
1085 pidf = open(pidfile, O_RDONLY, 0);
1086 if ( pidf > 0 ) {
1087 // pid file exists
1088 char s[32];
1089 ssize_t len;
1090 len = read(pidf, (void *)s, 31);
1091 close(pidf);
1092 s[31] = '\0';
1093 if ( len < 0 ) {
1094 fprintf(stderr, "read() error existing pid file: %s\n", strerror(errno));
1095 exit(255);
1096 } else {
1097 unsigned long pid = atol(s);
1098 if ( pid == 0 ) {
1099 // garbage - use this file
1100 unlink(pidfile);
1101 } else {
1102 if ( kill(pid, 0) == 0 ) {
1103 // process exists
1104 fprintf(stderr, "A process with pid %lu registered in pidfile %s is already running!\n",
1105 pid, strerror(errno));
1106 exit(255);
1107 } else {
1108 // no such process - use this file
1109 unlink(pidfile);
1110 }
1111 }
1112 }
1113 } else {
1114 if ( errno != ENOENT ) {
1115 fprintf(stderr, "open() error existing pid file: %s\n", strerror(errno));
1116 exit(255);
1117 } // else errno == ENOENT - no file - this is fine
1118 }
1119 }
1120
1121 if (argc - optind > 1) {
1122 usage(argv[0]);
1123 close(sock);
1124 exit(255);
1125 }
1126
1127 t_start = time(NULL);
1128 t_start = t_start - ( t_start % twin);
1129
1130 if ( do_daemonize ) {
1131 verbose = 0;
1132 daemonize();
1133 }
1134 if (strlen(pidfile)) {
1135 pid_t pid = getpid();
1136 int pidf = open(pidfile, O_RDWR|O_TRUNC|O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
1137 if ( pidf == -1 ) {
1138 LogError("Error opening pid file: '%s' %s", pidfile, strerror(errno));
1139 close(sock);
1140 exit(255);
1141 }
1142 snprintf(pidstr,31,"%lu\n", (unsigned long)pid);
1143 if ( write(pidf, pidstr, strlen(pidstr)) <= 0 ) {
1144 LogError("Error write pid file: '%s' %s", pidfile, strerror(errno));
1145 }
1146 close(pidf);
1147 }
1148
1149 done = 0;
1150 if ( launch_process || expire ) {
1151
1152 // create laucher comm memory struct
1153 shmem = mmap(0, sizeof(srecord_t), PROT_READ | PROT_WRITE, MAP_ANON | MAP_SHARED, -1, 0);
1154 if ( shmem == MAP_FAILED) {
1155 LogError("mmap() error in %s:%i: %s", __FILE__, __LINE__ , strerror(errno));
1156 close(sock);
1157 exit(255);
1158 }
1159
1160 launcher_pid = fork();
1161 switch (launcher_pid) {
1162 case 0:
1163 // child
1164 close(sock);
1165 launcher(shmem, FlowSource, launch_process, expire);
1166 _exit(0);
1167 break;
1168 case -1:
1169 LogError("fork() error: %s", strerror(errno));
1170 if ( strlen(pidfile) )
1171 unlink(pidfile);
1172 exit(255);
1173 break;
1174 default:
1175 // parent
1176 launcher_alive = 1;
1177 LogInfo("Launcher[%i] forked", launcher_pid);
1178 }
1179 }
1180
1181 fs = FlowSource;
1182 while ( fs ) {
1183 if ( InitBookkeeper(&fs->bookkeeper, fs->datadir, getpid(), launcher_pid) != BOOKKEEPER_OK ) {
1184 LogError("initialize bookkeeper failed.");
1185
1186 // release all already allocated bookkeepers
1187 fs = FlowSource;
1188 while ( fs && fs->bookkeeper ) {
1189 ReleaseBookkeeper(fs->bookkeeper, DESTROY_BOOKKEEPER);
1190 fs = fs->next;
1191 }
1192 close(sock);
1193 if ( launcher_pid )
1194 kill_launcher(launcher_pid);
1195 if ( strlen(pidfile) )
1196 unlink(pidfile);
1197 exit(255);
1198 }
1199
1200 // Init the extension map list
1201 if ( !InitExtensionMapList(fs) ) {
1202 // error message goes to syslog
1203 exit(255);
1204 }
1205
1206 fs = fs->next;
1207 }
1208
1209 /* Signal handling */
1210 memset((void *)&act,0,sizeof(struct sigaction));
1211 act.sa_handler = IntHandler;
1212 sigemptyset(&act.sa_mask);
1213 act.sa_flags = 0;
1214 sigaction(SIGTERM, &act, NULL);
1215 sigaction(SIGINT, &act, NULL);
1216 sigaction(SIGHUP, &act, NULL);
1217 sigaction(SIGALRM, &act, NULL);
1218 sigaction(SIGCHLD, &act, NULL);
1219
1220 LogInfo("Startup.");
1221 run(receive_packet, sock, repeater, twin, t_start, report_sequence, subdir_index,
1222 time_extension, compress);
1223 close(sock);
1224 kill_launcher(launcher_pid);
1225
1226 fs = FlowSource;
1227 while ( fs && fs->bookkeeper ) {
1228 dirstat_t *dirstat;
1229 // if we do not auto expire and there is a stat file, update the stats before we leave
1230 if ( expire == 0 && ReadStatInfo(fs->datadir, &dirstat, LOCK_IF_EXISTS) == STATFILE_OK ) {
1231 UpdateBookStat(dirstat, fs->bookkeeper);
1232 WriteStatInfo(dirstat);
1233 LogInfo("Updating statinfo in directory '%s'", datadir);
1234 }
1235
1236 ReleaseBookkeeper(fs->bookkeeper, DESTROY_BOOKKEEPER);
1237 fs = fs->next;
1238 }
1239
1240 LogInfo("Terminating nfcapd.");
1241 EndLog();
1242
1243 if ( strlen(pidfile) )
1244 unlink(pidfile);
1245
1246 return 0;
1247
1248 } /* End of main */
1249