1 /*
2  *  Copyright (c) 2013-2020, Peter Haag
3  *  All rights reserved.
4  *
5  *  Redistribution and use in source and binary forms, with or without
6  *  modification, are permitted provided that the following conditions are met:
7  *
8  *   * Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *   * Redistributions in binary form must reproduce the above copyright notice,
11  *     this list of conditions and the following disclaimer in the documentation
12  *     and/or other materials provided with the distribution.
13  *   * Neither the name of the author nor the names of its contributors may be
14  *     used to endorse or promote products derived from this software without
15  *     specific prior written permission.
16  *
17  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  *  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  *  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  *  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  *  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  *  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  *  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  *  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  *  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  *  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  *  POSSIBILITY OF SUCH DAMAGE.
28  *
29  */
30 
31 #include "config.h"
32 
33 #ifdef HAVE_FEATURES_H
34 #include <features.h>
35 #endif
36 
37 #include <stdlib.h>
38 #include <unistd.h>
39 #include <sys/types.h>
40 #include <sys/wait.h>
41 #include <sys/time.h>
42 #include <sys/stat.h>
43 #include <stdio.h>
44 #include <stdarg.h>
45 #include <pwd.h>
46 #include <grp.h>
47 #include <time.h>
48 #include <fcntl.h>
49 #include <sys/stat.h>
50 #include <sys/param.h>
51 #include <sys/socket.h>
52 #include <pthread.h>
53 #include <errno.h>
54 #include <signal.h>
55 #include <string.h>
56 #include <assert.h>
57 // #include <mcheck.h>
58 
59 #ifdef HAVE_STDINT_H
60 #include <stdint.h>
61 #endif
62 
63 #include <netinet/in.h>
64 #include <netinet/in_systm.h>
65 #include <netinet/ip.h>
66 #include <netinet/ip6.h>
67 #include <netinet/tcp.h>
68 #include <netinet/udp.h>
69 #include <netinet/ip_icmp.h>
70 #include <arpa/inet.h>
71 
72 
73 #ifdef HAVE_NET_BPF_H
74 # include <net/bpf.h>
75 #else
76 #ifdef HAVE_PCAP_BPF_H
77 # include <pcap/bpf.h>
78 #else
79 # error missing bpf header
80 #endif
81 #endif
82 
83 #include <pcap.h>
84 
85 #include "util.h"
86 #include "nfdump.h"
87 #include "nffile.h"
88 #include "nfx.h"
89 #include "expire.h"
90 #include "nfnet.h"
91 #include "flist.h"
92 #include "nfstatfile.h"
93 #include "bookkeeper.h"
94 #include "collector.h"
95 #include "exporter.h"
96 #include "ipfrag.h"
97 #include "flowtree.h"
98 #include "netflow_pcap.h"
99 #include "pcaproc.h"
100 
101 #define TIME_WINDOW     300
102 #define PROMISC         1
103 #define TIMEOUT         500
104 #define FILTER          ""
105 #define DEFAULT_DIR     "/var/tmp"
106 
107 #ifndef DLT_LINUX_SLL
108 #define DLT_LINUX_SLL   113
109 #endif
110 
111 #define EXPIREINTERVALL 10
112 
113 int verbose = 0;
114 
115 /*
116  * global static var: used by interrupt routine
117  */
118 #define PCAP_DUMPFILE "pcap.current"
119 
120 static const char *nfdump_version = VERSION;
121 
122 static int launcher_pid;
123 static pthread_mutex_t  m_done  = PTHREAD_MUTEX_INITIALIZER;
124 static pthread_cond_t terminate = PTHREAD_COND_INITIALIZER;
125 static pthread_key_t buffer_key;
126 
127 uint32_t linktype;
128 uint32_t linkoffset;
129 
130 // Common thread info struct
131 typedef struct thread_info_s {
132 	pthread_t tid;
133 	int		  done;
134 	int		  exit;
135 } thread_info_t;
136 
137 typedef struct p_pcap_flush_thread_args_s {
138 	// common thread info struct
139 	pthread_t tid;
140 	int		  done;
141 	int		  exit;
142 
143 	// the parent
144 	pthread_t parent;
145 
146 	// arguments
147 	int		subdir_index;
148 	char	*pcap_datadir;
149 	char	*time_extension;
150 	pcap_dev_t *pcap_dev;
151 	pcapfile_t *pcapfile;
152 } p_pcap_flush_thread_args_t;
153 
154 typedef struct p_packet_thread_args_s {
155 	// common thread info struct
156 	pthread_t tid;
157 	int		  done;
158 	int		  exit;
159 
160 	// the parent
161 	pthread_t parent;
162 
163 	// arguments
164 	NodeList_t *NodeList;		// push new nodes into this list
165 	pcap_dev_t *pcap_dev;
166 	time_t	t_win;
167 	int		subdir_index;
168 	char	*pcap_datadir;
169 	char	*time_extension;
170 	int		live;
171 } p_packet_thread_args_t;
172 
173 typedef struct p_flow_thread_args_s {
174 	// common thread info struct
175 	pthread_t tid;
176 	int	done;
177 	int	exit;
178 	// the parent
179 	pthread_t parent;
180 
181 	// arguments
182 	NodeList_t *NodeList;		// pop new nodes from this list
183 	FlowSource_t *fs;
184 	time_t	t_win;
185 	char	*time_extension;
186 	int		subdir_index;
187 	int		compress;
188 	int		live;
189 } p_flow_thread_args_t;
190 
191 /*
192  * Function prototypes
193  */
194 static void usage(char *name);
195 
196 static void daemonize(void);
197 
198 static void Interrupt_handler(int sig);
199 
200 static void SetPriv(char *userid, char *groupid );
201 
202 static pcap_dev_t *setup_pcap_live(char *device, char *filter, int snaplen);
203 
204 static pcap_dev_t *setup_pcap_Ffile(FILE *fp, char *filter, int snaplen);
205 
206 static pcap_dev_t *setup_pcap_file(char *pcap_file, char *filter, int snaplen);
207 
208 static void WaitDone(void);
209 
210 static void SignalThreadTerminate(thread_info_t *thread_info, pthread_cond_t *thread_cond );
211 
212 static void *p_pcap_flush_thread(void *thread_data);
213 
214 static void *p_flow_thread(void *thread_data);
215 
216 static void *p_packet_thread(void *thread_data);
217 
218 /*
219  * Functions
220  */
221 
usage(char * name)222 static void usage(char *name) {
223 	printf("usage %s [options] [\"pcap filter\"]\n"
224 					"-h\t\tthis text you see right here\n"
225 					"-u userid\tChange user to username\n"
226 					"-g groupid\tChange group to groupname\n"
227 					"-i interface\tread packets from interface\n"
228 					"-r pcapfile\tread packets from file\n"
229 					"-B num\tset the node cache size. (default 524288)\n"
230 					"-s snaplen\tset the snapshot length - default 1526\n"
231 					"-e active,inactive\tset the active,inactive flow expire time (s) - default 300,60\n"
232 					"-l flowdir \tset the flow output directory. (no default) \n"
233 					"-p pcapdir \tset the pcapdir directory. (optional) \n"
234 					"-S subdir\tSub directory format. see nfcapd(1) for format\n"
235 					"-I Ident\tset the ident string for stat file. (default 'none')\n"
236 					"-P pidfile\tset the PID file\n"
237 					"-t time frame\tset the time window to rotate pcap/nfcapd file\n"
238 					"-z\t\tLZO compress flows in output file.\n"
239 					"-y\t\tLZ4 compress flows in output file.\n"
240 					"-j\t\tBZ2 compress flows in output file.\n"
241 					"-E\t\tPrint extended format of netflow data. for debugging purpose only.\n"
242 					"-T\t\tInclude extension tags in records.\n"
243 					"-D\t\tdetach from terminal (daemonize)\n"
244 	, name);
245 } // End of usage
246 
Interrupt_handler(int sig)247 static void Interrupt_handler(int sig) {
248 pthread_t tid		 = pthread_self();
249 thread_info_t	*thread_info;
250 
251 	thread_info = (thread_info_t *)pthread_getspecific(buffer_key);
252 	if ( !thread_info ) {
253 		LogError("[%lu] Interrupt_handler() failed to get thread specific data block", (long unsigned)tid);
254 	} else {
255 		if ( thread_info->tid != tid ) {
256 			LogError("[%lu] Interrupt_handler() missmatch tid in thread_info", (long unsigned)tid);
257 		} else {
258 			thread_info->done = 1;
259 		}
260 	}
261 
262 } // End of signal_handler
263 
daemonize(void)264 static void daemonize(void) {
265 int fd;
266 	switch (fork()) {
267 		case 0:
268 			// child
269 			break;
270 		case -1:
271 			// error
272 			fprintf(stderr, "fork() error: %s\n", strerror(errno));
273 			exit(0);
274 			break;
275 		default:
276 			// parent
277 			_exit(0);
278 	}
279 
280 	if (setsid() < 0) {
281 		fprintf(stderr, "setsid() error: %s\n", strerror(errno));
282 		exit(0);
283 	}
284 
285 	// Double fork
286 	switch (fork()) {
287 		case 0:
288 			// child
289 			break;
290 		case -1:
291 			// error
292 			fprintf(stderr, "fork() error: %s\n", strerror(errno));
293 			exit(0);
294 			break;
295 		default:
296 			// parent
297 			_exit(0);
298 	}
299 
300 	fd = open("/dev/null", O_RDONLY);
301 	if (fd != 0) {
302 		dup2(fd, 0);
303 		close(fd);
304 	}
305 	fd = open("/dev/null", O_WRONLY);
306 	if (fd != 1) {
307 		dup2(fd, 1);
308 		close(fd);
309 	}
310 	fd = open("/dev/null", O_WRONLY);
311 	if (fd != 2) {
312 		dup2(fd, 2);
313 		close(fd);
314 	}
315 
316 } // End of daemonize
317 
SetPriv(char * userid,char * groupid)318 static void SetPriv(char *userid, char *groupid ) {
319 struct 	passwd *pw_entry;
320 struct 	group *gr_entry;
321 uid_t	myuid, newuid, newgid;
322 int		err;
323 
324 	if ( userid == 0 && groupid == 0 )
325 		return;
326 
327 	newuid = newgid = 0;
328 	myuid = getuid();
329 	if ( myuid != 0 ) {
330 		LogError("Only root wants to change uid/gid");
331 		fprintf(stderr, "ERROR: Only root wants to change uid/gid\n");
332 		exit(255);
333 	}
334 
335 	if ( userid ) {
336 		pw_entry = getpwnam(userid);
337 		newuid = pw_entry ? pw_entry->pw_uid : atol(userid);
338 
339 		if ( newuid == 0 ) {
340 			fprintf (stderr,"Invalid user '%s'\n", userid);
341 			exit(255);
342 		}
343 	}
344 
345 	if ( groupid ) {
346 		gr_entry = getgrnam(groupid);
347 		newgid = gr_entry ? gr_entry->gr_gid : atol(groupid);
348 
349 		if ( newgid == 0 ) {
350 			fprintf (stderr,"Invalid group '%s'\n", groupid);
351 			exit(255);
352 		}
353 
354 		err = setgid(newgid);
355 		if ( err ) {
356 			LogError("Can't set group id %ld for group '%s': %s",   (long)newgid, groupid, strerror(errno));
357 			fprintf (stderr,"Can't set group id %ld for group '%s': %s\n", (long)newgid, groupid, strerror(errno));
358 			exit(255);
359 		}
360 
361 	}
362 
363 	if ( newuid ) {
364 		err = setuid(newuid);
365 		if ( err ) {
366 			LogError("Can't set user id %ld for user '%s': %s",   (long)newuid, userid, strerror(errno));
367 			fprintf (stderr,"Can't set user id %ld for user '%s': %s\n", (long)newuid, userid, strerror(errno));
368 			exit(255);
369 		}
370 	}
371 
372 } // End of SetPriv
373 
setup_pcap_live(char * device,char * filter,int snaplen)374 static pcap_dev_t *setup_pcap_live(char *device, char *filter, int snaplen) {
375 pcap_t 		*handle    = NULL;
376 pcap_dev_t	*pcap_dev  = NULL;
377 pcap_if_t	*alldevsp = NULL;
378 char errbuf[PCAP_ERRBUF_SIZE];
379 bpf_u_int32 mask;		/* Our netmask */
380 bpf_u_int32 net;		/* Our IP */
381 struct bpf_program filter_code;
382 uint32_t	linkoffset, linktype;
383 
384 	dbg_printf("Enter function: %s\n", __FUNCTION__);
385 
386 	if (device == NULL) {
387 		if ( pcap_findalldevs(&alldevsp, errbuf) == -1 ) {
388 			LogError("pcap_findalldevs() error: %s in %s line %d",
389 				errbuf, __FILE__, __LINE__);
390 			return NULL;
391 		}
392 		if ( alldevsp == NULL ) {
393 			LogError("Couldn't find default device");
394 			return NULL;
395 		}
396 		device = alldevsp[0].name;
397 		LogInfo("Listen on %s", device);
398 	}
399 
400 	/* Find the properties for the device */
401 	if (pcap_lookupnet(device, &net, &mask, errbuf) == -1) {
402 		LogError("Couldn't get netmask for device %s: %s", device, errbuf);
403 		net = 0;
404 		mask = 0;
405 	}
406 
407 	/*
408 	 *  Open the packet capturing device with the following values:
409 	 *
410 	 *  PROMISC: on
411 	 *  The interface needs to be in promiscuous mode to capture all
412 	 *  network traffic on the localnet.
413 	 *  TIMEOUT: 500ms
414 	 *  A 500 ms timeout is probably fine for most networks.  For
415 	 *  architectures that support it, you might want tune this value
416 	 *  depending on how much traffic you're seeing on the network.
417 	 */
418 	handle = pcap_open_live(device, snaplen, PROMISC, TIMEOUT, errbuf);
419 	if (handle == NULL) {
420 		LogError("Couldn't open device %s: %s", device, errbuf);
421 		return NULL;
422 	}
423 
424 	// XXX
425 	// int pcap_set_buffer_size(pcap_t *p, int buffer_size);
426 
427 	if ( filter ) {
428 		/* Compile and apply the filter */
429 		if (pcap_compile(handle, &filter_code, filter, 0, net) == -1) {
430 			LogError("Couldn't parse filter %s: %s", filter, pcap_geterr(handle));
431 			return NULL;
432 		}
433 		if (pcap_setfilter(handle, &filter_code) == -1) {
434 			LogError("Couldn't install filter %s: %s", filter, pcap_geterr(handle));
435 			return NULL;
436 		}
437 	}
438 
439 	pcap_dev = (pcap_dev_t *)calloc(1, sizeof(pcap_dev_t));
440 	if ( !pcap_dev ) {
441 		LogError("malloc() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno) );
442 		return NULL;
443 	}
444 
445 	linkoffset = 0;
446 	linktype   = pcap_datalink(handle);
447 	switch ( linktype ) {
448 		case DLT_RAW:
449 			linkoffset = 0;
450 			break;
451 		case DLT_PPP:
452 			linkoffset = 2;
453 			break;
454 		case DLT_NULL:
455 			linkoffset = 4;
456 			break;
457 		case DLT_LOOP:
458 			linkoffset = 14;
459 			break;
460 		case DLT_EN10MB:
461 			linkoffset = 14;
462 			break;
463 		case DLT_LINUX_SLL:
464 			linkoffset = 16;
465 			break;
466 		case DLT_IEEE802_11:
467 			linkoffset = 22;
468 			break;
469 		default:
470 			LogError("Unsupported data link type %i", linktype);
471 			return NULL;
472 	}
473 
474 	pcap_dev->handle	 = handle;
475 	pcap_dev->snaplen	 = snaplen;
476 	pcap_dev->linkoffset = linkoffset;
477 	pcap_dev->linktype 	 = linktype;
478 
479 	return pcap_dev;
480 
481 } // End of setup_pcap_live
482 
setup_pcap_file(char * pcap_file,char * filter,int snaplen)483 static pcap_dev_t *setup_pcap_file(char *pcap_file, char *filter, int snaplen) {
484 FILE *fp;
485 
486 	fp = fopen(pcap_file, "rb");
487 	if ( !fp ) {
488 		LogError("Couldn't open file: %s: %s", pcap_file, strerror(errno));
489 		return NULL;
490 	}
491 	return setup_pcap_Ffile(fp, filter, snaplen);
492 
493 } // End of setup_pcap_file
494 
setup_pcap_Ffile(FILE * fp,char * filter,int snaplen)495 static pcap_dev_t *setup_pcap_Ffile(FILE *fp, char *filter, int snaplen) {
496 pcap_t 		*handle;
497 pcap_dev_t	*pcap_dev;
498 char errbuf[PCAP_ERRBUF_SIZE];
499 uint32_t	linkoffset, linktype;
500 
501 	dbg_printf("Enter function: %s\n", __FUNCTION__);
502 
503 	if ( !fp )
504 		return NULL;
505 
506 	handle = pcap_fopen_offline(fp, errbuf);
507 	if (handle == NULL) {
508 		LogError("Couldn't attach FILE handle %s", errbuf);
509 		return NULL;
510 	}
511 
512 	if ( filter ) {
513 		struct bpf_program filter_code;
514 		bpf_u_int32 netmask = 0;
515 		// Compile and apply the filter
516 		if (pcap_compile(handle, &filter_code, filter, 0, netmask) == -1) {
517 			LogError("Couldn't parse filter %s: %s", filter, pcap_geterr(handle));
518 			return NULL;
519 		}
520 		if (pcap_setfilter(handle, &filter_code) == -1) {
521 			LogError("Couldn't install filter %s: %s", filter, pcap_geterr(handle));
522 			return NULL;
523 		}
524 	}
525 
526 	linkoffset = 0;
527 	linktype   = pcap_datalink(handle);
528 	switch ( linktype ) {
529 		case DLT_RAW:
530 			linkoffset = 0;
531 			break;
532 		case DLT_PPP:
533 			linkoffset = 2;
534 			break;
535 		case DLT_NULL:
536 			linkoffset = 4;
537 			break;
538 		case DLT_LOOP:
539 			linkoffset = 14;
540 			break;
541 		case DLT_EN10MB:
542 			linkoffset = 14;
543 			break;
544 		case DLT_LINUX_SLL:
545 			linkoffset = 16;
546 			break;
547 		case DLT_IEEE802_11:
548 			linkoffset = 22;
549 			break;
550 		default:
551 			LogError("Unsupported data link type %i", linktype);
552 			return NULL;
553 	}
554 
555 	pcap_dev = (pcap_dev_t *)calloc(1, sizeof(pcap_dev_t));
556 	if ( !pcap_dev ) {
557 		LogError("malloc() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno) );
558 		return NULL;
559 	}
560 
561 	pcap_dev->handle	 = handle;
562 	pcap_dev->snaplen	 = snaplen;
563 	pcap_dev->linkoffset = linkoffset;
564 	pcap_dev->linktype	 = linktype;
565 
566 	return pcap_dev;
567 
568 } // End of setup_pcap_file
569 
SignalThreadTerminate(thread_info_t * thread_info,pthread_cond_t * thread_cond)570 static void SignalThreadTerminate(thread_info_t *thread_info, pthread_cond_t *thread_cond ) {
571 
572 	if ( !thread_info->done ) {
573 		dbg_printf("Signal thread[%lu] to terminate\n", (long unsigned)thread_info->tid);
574     	if ( pthread_kill(thread_info->tid, SIGUSR2) != 0 ) {
575 			dbg_printf("Failed to signal thread[%lu]\n", (long unsigned)thread_info->tid);
576 		}
577 
578 		// in case of a condition - signal condition
579 		if ( thread_cond )
580 			pthread_cond_signal(thread_cond);
581 
582 	} else {
583 		dbg_printf("thread[%lu] gone already\n", (long unsigned)thread_info->tid);
584 	}
585 
586    	if( pthread_join(thread_info->tid, NULL) == 0 ) {
587        	dbg_printf("thread %lu joined\n", (long unsigned)thread_info->tid);
588 	} else {
589        	dbg_printf("thread %lu no join\n", (long unsigned)thread_info->tid);
590 	}
591 
592 	LogInfo("Exit status thread[%lu]: %i", thread_info->tid, thread_info->exit);
593 
594 } // End of SignalThreadEnd
595 
p_flow_thread(void * thread_data)596 __attribute__((noreturn)) static void *p_flow_thread(void *thread_data) {
597 // argument dispatching
598 p_flow_thread_args_t *args = (p_flow_thread_args_t *)thread_data;
599 time_t t_win		 = args->t_win;
600 int subdir_index	 = args->subdir_index;
601 int compress	 	 = args->compress;
602 int live		 	 = args->live;
603 FlowSource_t *fs	 = args->fs;
604 char *time_extension = args->time_extension;
605 static time_t lastExpire = 0;
606 time_t t_start, t_clock;
607 int err, done;
608 
609 	done 	   = 0;
610 	args->done = 0;
611 	args->exit = 0;
612 
613 	err = pthread_setspecific( buffer_key, (void *)args );
614 	if ( err ) {
615 		LogError("[%lu] pthread_setspecific() error in %s line %d: %s\n",
616 			(long unsigned)args->tid, __FILE__, __LINE__, strerror(errno) );
617 		args->done = 1;
618 		args->exit = 255;
619    		pthread_kill(args->parent, SIGUSR1);
620 		pthread_exit((void *)args);
621 	}
622 
623 	if ( !Init_pcap2nf() ) {
624 		args->done = 1;
625 		args->exit = 255;
626    		pthread_kill(args->parent, SIGUSR1);
627 		pthread_exit((void *)args);
628 	}
629 
630 	// prepare file
631 	fs->nffile = OpenNewFile(fs->current, NULL, compress, 0, NULL);
632 	if ( !fs->nffile ) {
633 		args->done = 1;
634 		args->exit = 255;
635    		pthread_kill(args->parent, SIGUSR1);
636 		pthread_exit((void *)args);
637 	}
638 
639 	// init vars
640 	fs->bad_packets		= 0;
641 	fs->first_seen      = 0xffffffffffffLL;
642 	fs->last_seen 		= 0;
643 
644 	t_start = 0;
645 	t_clock = 0;
646 	while ( 1 ) {
647 		struct FlowNode	*Node;
648 
649 		Node = Pop_Node(args->NodeList, &args->done);
650 		if ( Node ) {
651 			t_clock = Node->t_last.tv_sec;
652 			dbg_printf("p_flow_thread() Next Node\n");
653 		} else {
654 			done = args->done;
655 			dbg_printf("p_flow_thread() NULL Node\n");
656 		}
657 
658 		if ( t_start == 0 ) {
659 			t_start = t_clock - (t_clock % t_win);
660 		}
661 
662 		if (((t_clock - t_start) >= t_win) || done) { /* rotate file */
663 			struct tm *when;
664 			nffile_t *nffile;
665 			char FullName[MAXPATHLEN];
666 			char netflowFname[128];
667 			char error[256];
668 			char *subdir, fmt[24];
669 			uint32_t NumFlows;
670 
671 			// flush all flows to disk
672 			DumpNodeStat(args->NodeList);
673 			if (done)
674 				NumFlows = Flush_FlowTree(fs);
675 			else
676 				NumFlows = Expire_FlowTree(fs, t_clock);
677 
678 			when = localtime(&t_start);
679 			strftime(fmt, sizeof(fmt), time_extension, when);
680 
681 			nffile = fs->nffile;
682 
683 			// prepare sub dir hierarchy
684 			if ( subdir_index ) {
685 				subdir = GetSubDir(when);
686 				if ( !subdir ) {
687 					// failed to generate subdir path - put flows into base directory
688 					LogError("Failed to create subdir path!");
689 
690 					// failed to generate subdir path - put flows into base directory
691 					subdir = NULL;
692 					snprintf(netflowFname, 127, "nfcapd.%s", fmt);
693 				} else {
694 					snprintf(netflowFname, 127, "%s/nfcapd.%s", subdir, fmt);
695 				}
696 
697 			} else {
698 				subdir = NULL;
699 				snprintf(netflowFname, 127, "nfcapd.%s", fmt);
700 			}
701 			netflowFname[127] = '\0';
702 
703 			if ( subdir && !SetupSubDir(fs->datadir, subdir, error, 255) ) {
704 				// in this case the flows get lost! - the rename will fail
705 				// but this should not happen anyway, unless i/o problems, inode problems etc.
706 				LogError("Ident: %s, Failed to create sub hier directories: %s", fs->Ident, error );
707 			}
708 
709 			if ( nffile->block_header->NumRecords ) {
710 				// flush current buffer to disc
711 				if ( WriteBlock(nffile) <= 0 )
712 					LogError("Ident: %s, failed to write output buffer to disk: '%s'" , fs->Ident, strerror(errno));
713 			} // else - no new records in current block
714 
715 			// prepare full filename
716 			snprintf(FullName, MAXPATHLEN-1, "%s/%s", fs->datadir, netflowFname);
717 			FullName[MAXPATHLEN-1] = '\0';
718 
719 			// update stat record
720 			// if no flows were collected, fs->last_seen is still 0
721 			// set first_seen to start of this time slot, with twin window size.
722 			if ( fs->last_seen == 0 ) {
723 				fs->first_seen = (uint64_t)1000 * (uint64_t)t_start;
724 				fs->last_seen  = (uint64_t)1000 * (uint64_t)(t_start + t_win);
725 			}
726 			nffile->stat_record->first_seen = fs->first_seen/1000;
727 			nffile->stat_record->msec_first	= fs->first_seen - nffile->stat_record->first_seen*1000;
728 			nffile->stat_record->last_seen 	= fs->last_seen/1000;
729 			nffile->stat_record->msec_last	= fs->last_seen - nffile->stat_record->last_seen*1000;
730 
731 			// Flush Exporter Stat to file
732 			FlushExporterStats(fs);
733 			// Close file
734 			CloseUpdateFile(nffile, fs->Ident);
735 
736 			// if rename fails, we are in big trouble, as we need to get rid of the old .current file
737 			// otherwise, we will loose flows and can not continue collecting new flows
738 			if ( !RenameAppend(fs->current, FullName) ) {
739 				LogError("Ident: %s, Can't rename dump file: %s", fs->Ident,  strerror(errno));
740 				LogError("Ident: %s, Serious Problem! Fix manually", fs->Ident);
741 	/* XXX
742 				if ( launcher_pid )
743 					commbuff->failed = 1;
744 	*/
745 				// we do not update the books here, as the file failed to rename properly
746 				// otherwise the books may be wrong
747 			} else {
748 				struct stat	fstat;
749 	/* XXX
750 				if ( launcher_pid )
751 					commbuff->failed = 0;
752 	*/
753 				// Update books
754 				stat(FullName, &fstat);
755 				UpdateBooks(fs->bookkeeper, t_start, 512*fstat.st_blocks);
756 			}
757 
758 			LogInfo("Ident: '%s' Flows: %llu, Packets: %llu, Bytes: %llu, Max Flows: %u, Fragments: %u",
759 				fs->Ident, (unsigned long long)nffile->stat_record->numflows, (unsigned long long)nffile->stat_record->numpackets,
760 				(unsigned long long)nffile->stat_record->numbytes, NumFlows, IPFragEntries());
761 
762 			// reset stats
763 			fs->bad_packets = 0;
764 			fs->first_seen  = 0xffffffffffffLL;
765 			fs->last_seen 	= 0;
766 
767 			// Dump all extension maps and exporters to the buffer
768 			FlushStdRecords(fs);
769 
770 			if ( done )
771 				break;
772 
773 			t_start = t_clock - (t_clock % t_win);
774 
775 			nffile = OpenNewFile(fs->current, nffile, compress, 0, NULL);
776 			if ( !nffile ) {
777 				LogError("Fatal: OpenNewFile() failed for ident: %s", fs->Ident);
778 				args->done = 1;
779 				args->exit = 255;
780    				pthread_kill(args->parent, SIGUSR1);
781 				break;
782 			}
783 		}
784 
785 		time_t when;
786 		if ( Node ) {
787 			if ( Node->fin != SIGNAL_NODE ) {
788 				// Process the Node
789 				ProcessFlowNode(fs, Node);
790 			}
791 			when = Node->t_last.tv_sec;
792 		} else {
793 			when = time(NULL);
794 		}
795 		if ( (when - lastExpire) > EXPIREINTERVALL ) {
796 			Expire_FlowTree(fs, when);
797 			lastExpire = when;
798 		}
799 		CacheCheck(fs, when, live);
800 
801 	}
802 
803 	while ( fs ) {
804 		DisposeFile(fs->nffile);
805 		fs = fs->next;
806 	}
807 	LogInfo("Terminating flow processng: exit: %i", args->exit);
808 	dbg_printf("End flow thread[%lu]\n", (long unsigned)args->tid);
809 
810 	pthread_exit((void *)args);
811 	/* NOTREACHED */
812 
813 } // End of p_flow_thread
814 
p_pcap_flush_thread(void * thread_data)815 __attribute__((noreturn)) static void *p_pcap_flush_thread(void *thread_data) {
816 // argument dispatching
817 p_pcap_flush_thread_args_t *args = (p_pcap_flush_thread_args_t *)thread_data;
818 char *pcap_datadir	 = args->pcap_datadir;
819 pcap_dev_t *pcap_dev = args->pcap_dev;
820 pcapfile_t *pcapfile = args->pcapfile;
821 char *time_extension = args->time_extension;
822 char pcap_dumpfile[MAXPATHLEN];
823 int err;
824 int runs = 0;
825 
826 	dbg_printf("New flush thread[%lu]\n", (long unsigned)args->tid);
827 	args->done = 0;
828 	args->exit = 0;
829 
830 	err = pthread_setspecific( buffer_key, (void *)args );
831 	if ( err ) {
832 		LogError("[%lu] pthread_setspecific() error in %s line %d: %s\n",
833 			(long unsigned)args->tid, __FILE__, __LINE__, strerror(errno) );
834 		args->done = 1;
835 		args->exit = 255;
836    		pthread_kill(args->parent, SIGUSR1);
837 		pthread_exit((void *)args);
838 	}
839 
840 	snprintf(pcap_dumpfile, MAXPATHLEN-1, "%s/%s.%lu", pcap_datadir , PCAP_DUMPFILE, (unsigned long)getpid() );
841 	pcapfile = OpenNewPcapFile(pcap_dev->handle, pcap_dumpfile, pcapfile);
842 	if ( !pcapfile ) {
843 		args->done = 1;
844 		args->exit = 255;
845    		pthread_kill(args->parent, SIGUSR1);
846 		pthread_exit((void *)args);
847 		/* NOTREACHED */
848 	}
849 
850 
851 	// wait for alternate buffer to be ready to flush
852 	while ( 1 ) {
853     	pthread_mutex_lock(&pcapfile->m_pbuff);
854     	while ( pcapfile->alternate_size == 0 && !args->done ) {
855         	pthread_cond_wait(&pcapfile->c_pbuff, &pcapfile->m_pbuff);
856     	}
857 		dbg_printf("Flush cycle\n");
858 		runs++;
859 		// try to flush alternate buffer
860 		if ( pcapfile->alternate_size ) {
861 			// flush alternate buffer
862 			dbg_printf("Flush alternate\n");
863     		if ( write(pcapfile->pfd, (void *)pcapfile->alternate_buffer, pcapfile->alternate_size) <= 0 ) {
864         		LogError("write() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno) );
865     		}
866 			pcapfile->alternate_size = 0;
867 		}
868 
869 		// if we are done, try to flsuh main data buffer
870 		if ( args->done && pcapfile->data_size ) {
871 			dbg_printf("Done: Flush all buffers\n");
872 			// flush alternate buffer
873     		if ( write(pcapfile->pfd, (void *)pcapfile->data_buffer, pcapfile->data_size) <= 0 ) {
874         		LogError("write() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno) );
875     		}
876 			pcapfile->data_size = 0;
877 			pcapfile->data_ptr  = pcapfile->data_buffer;
878 		}
879 
880 		// check if we need to rotate/close the file
881 		if ( args->done || pcapfile->t_CloseRename ) { /* rotate file */
882 			struct tm *when;
883 			char FullName[MAXPATHLEN];
884 			char pcapFname[128];
885 			char error[256];
886 			char *subdir, fmt[24];
887 			int err;
888 
889 			dbg_printf("Flush rotate file\n");
890 			when = localtime(&pcapfile->t_CloseRename);
891 			strftime(fmt, sizeof(fmt), time_extension, when);
892 
893 			pcapfile->t_CloseRename = 0;
894 
895 			// prepare sub dir hierarchy
896 			if ( args->subdir_index ) {
897 				subdir = GetSubDir(when);
898 				if ( !subdir ) {
899 					// failed to generate subdir path - put flows into base directory
900 					LogError("Failed to create subdir path!");
901 
902 					// failed to generate subdir path - put flows into base directory
903 					subdir = NULL;
904 					snprintf(pcapFname, 127, "pcapd.%s", fmt);
905 				} else {
906 					snprintf(pcapFname, 127, "%s/pcapd.%s", subdir, fmt);
907 				}
908 
909 			} else {
910 				subdir = NULL;
911 				snprintf(pcapFname, 127, "pcapd.%s", fmt);
912 			}
913 			pcapFname[127] 	  = '\0';
914 
915 			if ( subdir && !SetupSubDir(pcap_datadir, subdir, error, 255) ) {
916 				// in this case the flows get lost! - the rename will fail
917 				// but this should not happen anyway, unless i/o problems, inode problems etc.
918 				LogError("p_packet_thread() Failed to create sub hier directories: %s", error );
919 			}
920 
921 			// prepare full filename
922 			snprintf(FullName, MAXPATHLEN-1, "%s/%s", pcap_datadir, pcapFname);
923 			FullName[MAXPATHLEN-1] = '\0';
924 
925 			ClosePcapFile(pcapfile);
926 			err = rename(pcap_dumpfile, FullName);
927 			if (err) {
928 				LogError("rename() pcap failed in %s line %d: %s", __FILE__, __LINE__, strerror(errno) );
929 			}
930 			dbg_printf("Rotate file: %s -> %s\n", pcap_dumpfile, FullName);
931 
932 			if ( args->done ) {
933     			pthread_mutex_unlock(&pcapfile->m_pbuff);
934 				pthread_cond_signal(&pcapfile->c_pbuff);
935 				break;
936 			}
937 
938 			// open new files
939 			pcapfile = OpenNewPcapFile(pcap_dev->handle, pcap_dumpfile, pcapfile);
940 			if (!pcapfile) {
941 				args->done = 1;
942 				args->exit = 255;
943    				pthread_kill(args->parent, SIGUSR1);
944     			pthread_mutex_unlock(&pcapfile->m_pbuff);
945 				pthread_cond_signal(&pcapfile->c_pbuff);
946 				break;
947 			}
948 
949 		}
950 		dbg_printf("Flush cycle done\n");
951     	pthread_mutex_unlock(&pcapfile->m_pbuff);
952 		pthread_cond_signal(&pcapfile->c_pbuff);
953 	}
954 
955 	dbg_printf("End flush thread[%lu]: %i runs\n", (long unsigned)args->tid, runs);
956 	pthread_exit((void *)args);
957 	/* NOTREACHED */
958 
959 } // End of p_pcap_flush_thread
960 
p_packet_thread(void * thread_data)961 __attribute__((noreturn)) static void *p_packet_thread(void *thread_data) {
962 // argument dispatching
963 p_packet_thread_args_t *args = (p_packet_thread_args_t *)thread_data;
964 pcap_dev_t *pcap_dev = args->pcap_dev;
965 time_t t_win		 = args->t_win;
966 char *pcap_datadir 	 = args->pcap_datadir;
967 char *time_extension = args->time_extension;
968 int subdir_index	 = args->subdir_index;
969 int live		 	 = args->live;
970 // locals
971 p_pcap_flush_thread_args_t p_flush_thread_args;
972 pcapfile_t *pcapfile;
973 time_t t_start;
974 int err;
975 
976 	dbg_printf("New packet thread[%lu]\n", (long unsigned)args->tid);
977 	args->done = 0;
978 	args->exit = 0;
979 
980 	err = pthread_setspecific( buffer_key, (void *)args );
981 	if ( err ) {
982 		LogError("[%lu] pthread_setspecific() error in %s line %d: %s\n",
983 			(long unsigned)args->tid, __FILE__, __LINE__, strerror(errno) );
984 		args->done = 1;
985 		args->exit = 255;
986    		pthread_kill(args->parent, SIGUSR1);
987 		pthread_exit((void *)args);
988 		/* NOTREACHED */
989 	}
990 
991 	/* start flush and pcap file handler thread */
992 	if ( pcap_datadir ) {
993 		// just allocate pcapfile and buffers - we need to share pcapfile
994 		pcapfile = OpenNewPcapFile(pcap_dev->handle, NULL, NULL);
995 		if ( !pcapfile ) {
996 			args->done = 1;
997 			args->exit = 255;
998    			pthread_kill(args->parent, SIGUSR1);
999 			pthread_exit((void *)args);
1000 			/* NOTREACHED */
1001 		}
1002 
1003 		p_flush_thread_args.done		   = 0;
1004 		p_flush_thread_args.exit		   = 0;
1005 		p_flush_thread_args.parent		   = args->tid;
1006 		p_flush_thread_args.pcap_dev	   = args->pcap_dev;
1007 		p_flush_thread_args.subdir_index   = subdir_index;
1008 		p_flush_thread_args.pcap_datadir   = pcap_datadir;
1009 		p_flush_thread_args.time_extension = time_extension;
1010 		p_flush_thread_args.pcapfile	   = pcapfile;
1011 
1012 		err = pthread_create(&p_flush_thread_args.tid, NULL, p_pcap_flush_thread, (void *)&p_flush_thread_args);
1013 		if ( err ) {
1014 			LogError("pthread_create() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno) );
1015 			args->done = 1;
1016 			args->exit = 255;
1017    			pthread_kill(args->parent, SIGUSR1);
1018 			pthread_exit((void *)args);
1019 		}
1020 		dbg_printf("Started flush thread[%lu]\n", (long unsigned)p_flush_thread_args.tid);
1021 
1022 	} else {
1023 		pcapfile = NULL;
1024 	}
1025 
1026 	err = 0;
1027 	t_start = 0;
1028 	while ( 1 ) {
1029 		struct pcap_pkthdr *hdr;
1030 		const u_char 	   *data;
1031 		struct timeval tv;
1032 		time_t t_clock;
1033 		int ret;
1034 
1035 		if ( !args->done ) {
1036 			ret = pcap_next_ex(pcap_dev->handle, &hdr, &data);
1037 			t_clock = 0;
1038 			switch (ret) {
1039 				case 1: {
1040 					// packet read ok
1041 					t_clock = hdr->ts.tv_sec;
1042 					// process packet for flow cache
1043 					ProcessPacket(args->NodeList, pcap_dev, hdr, data);
1044 					if ( pcap_datadir ) {
1045 						// keep the packet
1046 						if (((t_clock - t_start) >= t_win)) {
1047 							// first packet or rotate file
1048 							if ( t_start != 0 ) {
1049 								RotateFile(pcapfile, t_start, live);
1050 							}
1051 							// if first packet - set t_start here
1052 							t_start = t_clock - (t_clock % t_win);
1053 						}
1054 						PcapDump(pcapfile, hdr, data);
1055 					}
1056 					} break;
1057 				case 0: {
1058 					// live capture idle cycle
1059 					dbg_printf("pcap_next_ex() read live - timeout\n");
1060 					gettimeofday(&tv, NULL);
1061 					t_clock = tv.tv_sec;
1062 					if ((t_clock - t_start) >= t_win) { /* rotate file */
1063 						if ( t_start ) {
1064 							// if not first packet, where t_start = 0
1065 							struct FlowNode	*Node = New_Node();
1066 							Node->t_first = tv;
1067 							Node->t_last  = tv;
1068 							Node->fin  	  = SIGNAL_NODE;
1069 							Push_Node(args->NodeList, Node);
1070 							if ( pcap_datadir ) {
1071 								// keep the packet
1072 								RotateFile(pcapfile, t_start, live);
1073 							}
1074 							LogInfo("Packet processing stats: Total: %u, Skipped: %u, Unknown: %u, Short snaplen: %u",
1075 								pcap_dev->proc_stat.packets, pcap_dev->proc_stat.skipped,
1076 								pcap_dev->proc_stat.unknown, pcap_dev->proc_stat.short_snap);
1077 						}
1078 						if ( live ) {
1079 							struct pcap_stat p_stat;
1080 							if( pcap_stats(pcap_dev->handle, &p_stat) < 0) {
1081 								LogInfo("pcap_stats() failed: %s", pcap_geterr(pcapfile->p));
1082 							} else {
1083 								LogInfo("Dropped: %u, dropped by interface: %u ",
1084 									p_stat.ps_drop, p_stat.ps_ifdrop );
1085 							}
1086 						}
1087 						t_start = t_clock - (t_clock % t_win);
1088 						memset((void *)&(pcap_dev->proc_stat), 0, sizeof(proc_stat_t));
1089 					}
1090 					} break;
1091 				case -1:
1092 					// signal error reading the packet
1093     				err = 1;
1094 					LogError("pcap_next_ex() read error: '%s'", pcap_geterr(pcap_dev->handle));
1095 					args->done = 1;
1096 					continue;
1097 					break;
1098 				case -2: // End of packet file
1099 					// signal parent, job is done
1100     				err = 1;
1101 					LogInfo("pcap_next_ex() end of file");
1102 					args->done = 1;
1103 					LogInfo("Packet processing stats: Total: %u, Skipped: %u, Unknown: %u, Short snaplen: %u",
1104 						pcap_dev->proc_stat.packets, pcap_dev->proc_stat.skipped,
1105 						pcap_dev->proc_stat.unknown, pcap_dev->proc_stat.short_snap);
1106 					continue;
1107 					break;
1108 				default:
1109     				err = 1;
1110 					pcap_breakloop(pcap_dev->handle);
1111 					LogError("Unexpected pcap_next_ex() return value: %i", ret);
1112 					args->done = 1;
1113 					continue;
1114 			}
1115 
1116 		}
1117 
1118 		if ( args->done )
1119 			break;
1120 
1121 	}
1122 
1123 	if ( pcap_datadir ) {
1124 		dbg_printf("Wait for flush thread to complete\n");
1125    		pthread_mutex_lock(&pcapfile->m_pbuff);
1126     	while ( pcapfile->alternate_size ) {
1127         	pthread_cond_wait(&pcapfile->c_pbuff, &pcapfile->m_pbuff);
1128     	}
1129 		pcapfile->t_CloseRename = t_start;
1130    		pthread_mutex_unlock(&pcapfile->m_pbuff);
1131 		dbg_printf("Wait done.\n");
1132 
1133 		LogInfo("Signal flush thread[%lu] to terminate", p_flush_thread_args.tid);
1134 		SignalThreadTerminate((thread_info_t *)&p_flush_thread_args, &pcapfile->c_pbuff);
1135 	}
1136 
1137 	if ( err )
1138   		pthread_kill(args->parent, SIGUSR1);
1139 
1140 	LogInfo("Packet processing stats: Total: %u, Skipped: %u, Unknown: %u, Short snaplen: %u",
1141 		pcap_dev->proc_stat.packets, pcap_dev->proc_stat.skipped,
1142 		pcap_dev->proc_stat.unknown, pcap_dev->proc_stat.short_snap);
1143 	LogInfo("Terminating packet dumping: exit: %i", args->exit);
1144 	dbg_printf("End packet thread[%lu]\n", (long unsigned)args->tid);
1145 
1146 	pthread_exit((void *)args);
1147 	/* NOTREACHED */
1148 
1149 } /* End of p_packet_thread */
1150 
WaitDone(void)1151 static void WaitDone(void) {
1152 sigset_t signal_set;
1153 int done, sig;
1154 pthread_t tid   = pthread_self();
1155 
1156 	LogInfo("[%lu] WaitDone() waiting", (long unsigned)tid);
1157 
1158 	sigemptyset(&signal_set);
1159 	sigaddset(&signal_set, SIGINT);
1160 	sigaddset(&signal_set, SIGHUP);
1161 	sigaddset(&signal_set, SIGTERM);
1162 	sigaddset(&signal_set, SIGUSR1);
1163 	pthread_sigmask(SIG_BLOCK, &signal_set, NULL);
1164 
1165 	done = 0;
1166 	while ( !done ) {
1167 		sigwait(&signal_set, &sig);
1168 		LogInfo("[%lu] WaitDone() signal %i", (long unsigned)tid, sig);
1169 		switch ( sig ) {
1170 			case SIGHUP:
1171 				break;
1172 			case SIGINT:
1173 			case SIGTERM:
1174 				pthread_mutex_lock(&m_done);
1175 				done = 1;
1176 				pthread_mutex_unlock(&m_done);
1177 				pthread_cond_signal(&terminate);
1178 				break;
1179 			case SIGUSR1:
1180 				// child signals end of job
1181 				done = 1;
1182 				break;
1183 			// default:
1184 				// empty
1185 		}
1186 	}
1187 
1188 } // End of WaitDone
1189 
main(int argc,char * argv[])1190 int main(int argc, char *argv[]) {
1191 sigset_t			signal_set;
1192 struct sigaction	sa;
1193 int c, snaplen, err, do_daemonize;
1194 int subdir_index, compress, expire, cache_size;
1195 int active, inactive;
1196 FlowSource_t	*fs;
1197 dirstat_t 		*dirstat;
1198 time_t 			t_win;
1199 char 			*device, *pcapfile, *filter, *datadir, *pcap_datadir, *extension_tags, pidfile[MAXPATHLEN], pidstr[32];
1200 char			*Ident, *userid, *groupid;
1201 char			*time_extension;
1202 pcap_dev_t 		*pcap_dev;
1203 p_packet_thread_args_t *p_packet_thread_args;
1204 p_flow_thread_args_t *p_flow_thread_args;
1205 
1206 	snaplen			= 1526;
1207 	do_daemonize	= 0;
1208 	launcher_pid	= 0;
1209 	device			= NULL;
1210 	pcapfile		= NULL;
1211 	filter			= NULL;
1212 	pidfile[0]		= '\0';
1213 	t_win			= TIME_WINDOW;
1214 	datadir			= DEFAULT_DIR;
1215 	pcap_datadir	= NULL;
1216 	userid			= groupid = NULL;
1217 	Ident			= "none";
1218 	fs				= NULL;
1219 	extension_tags	= DefaultExtensions;
1220 	time_extension	= "%Y%m%d%H%M";
1221 	subdir_index	= 0;
1222 	compress		= NOT_COMPRESSED;
1223 	verbose			= 0;
1224 	expire			= 0;
1225 	cache_size		= 0;
1226 	active			= 0;
1227 	inactive		= 0;
1228 	while ((c = getopt(argc, argv, "B:DEI:e:g:hi:j:r:s:l:p:P:t:u:S:T:Vyz")) != EOF) {
1229 		switch (c) {
1230 			struct stat fstat;
1231 			case 'h':
1232 				usage(argv[0]);
1233 				exit(0);
1234 				break;
1235 			case 'u':
1236 				userid  = optarg;
1237 				break;
1238 			case 'g':
1239 				groupid  = optarg;
1240 				break;
1241 			case 'D':
1242 				do_daemonize = 1;
1243 				break;
1244 			case 'B':
1245 				cache_size = atoi(optarg);
1246 				if (cache_size <= 0) {
1247 					LogError("ERROR: Cache size must not be < 0");
1248 					exit(EXIT_FAILURE);
1249 				}
1250 				break;
1251 			case 'I':
1252 				Ident = strdup(optarg);
1253 				break;
1254 			case 'i':
1255 				device = optarg;
1256 				break;
1257 			case 'l':
1258 				datadir = optarg;
1259 				err  = stat(datadir, &fstat);
1260 				if (!(fstat.st_mode & S_IFDIR)) {
1261 					LogError("No such directory: " "'%s'", datadir);
1262 					break;
1263 				}
1264 				break;
1265 			case 'p':
1266 				pcap_datadir = optarg;
1267 				err  = stat(pcap_datadir, &fstat);
1268 				if (!(fstat.st_mode & S_IFDIR)) {
1269 					LogError("No such directory: " "'%s'", pcap_datadir);
1270 					break;
1271 				}
1272 				break;
1273 			case 'r': {
1274 				struct stat stat_buf;
1275 				pcapfile = optarg;
1276 				if ( stat(pcapfile, &stat_buf) ) {
1277 					LogError("Can't stat '%s': %s", pcapfile, strerror(errno));
1278 					exit(EXIT_FAILURE);
1279 				}
1280 				if (!S_ISREG(stat_buf.st_mode) ) {
1281 					LogError("'%s' is not a file", pcapfile);
1282 					exit(EXIT_FAILURE);
1283 				}
1284 				} break;
1285 			case 's':
1286 				snaplen = atoi(optarg);
1287 				if (snaplen < 14 + 20 + 20) { // ethernet, IP , TCP, no payload
1288 					LogError("ERROR:, snaplen < sizeof IPv4 - Need 54 bytes for TCP/IPv4");
1289 					exit(EXIT_FAILURE);
1290 				}
1291 				break;
1292 			case 'e': {
1293 				if ( strlen(optarg) > 16 ) {
1294 					LogError("ERROR:, size timeout values too big");
1295 					exit(EXIT_FAILURE);
1296 				}
1297 				char *s = strdup(optarg);
1298 				char *sep = strchr(s, ',');
1299 				if ( !sep ) {
1300 					LogError("ERROR:, timeout values format error");
1301 					exit(EXIT_FAILURE);
1302 				}
1303 				*sep = '\0';
1304 				sep++;
1305 				active   = atoi(s);
1306 				inactive = atoi(sep);
1307 				if (snaplen < 14 + 20 + 20) { // ethernet, IP , TCP, no payload
1308 					LogError("ERROR:, snaplen < sizeof IPv4 - Need 54 bytes for TCP/IPv4");
1309 					exit(EXIT_FAILURE);
1310 				}
1311 				} break;
1312 			case 't':
1313 				t_win = atoi(optarg);
1314 				if (t_win < 2) {
1315 					LogError("time interval <= 2s not allowed");
1316 					exit(EXIT_FAILURE);
1317 				}
1318 				if (t_win < 60) {
1319 					time_extension	= "%Y%m%d%H%M%S";
1320 				}
1321 				break;
1322 			case 'j':
1323 				if ( compress ) {
1324 					LogError("Use either -z for LZO or -j for BZ2 compression, but not both\n");
1325 					exit(255);
1326 				}
1327 				compress = BZ2_COMPRESSED;
1328 				break;
1329 			case 'y':
1330 				if ( compress ) {
1331 					LogError("Use one compression: -z for LZO, -j for BZ2 or -y for LZ4 compression\n");
1332 					exit(255);
1333 				}
1334 				compress = LZ4_COMPRESSED;
1335 				break;
1336 			case 'z':
1337 				if ( compress ) {
1338 					LogError("Use either -z for LZO or -j for BZ2 compression, but not both\n");
1339 					exit(255);
1340 				}
1341 				compress = LZO_COMPRESSED;
1342 				break;
1343 			case 'P':
1344 				if ( optarg[0] == '/' ) { 	// absolute path given
1345 					strncpy(pidfile, optarg, MAXPATHLEN-1);
1346 				} else {					// path relative to current working directory
1347 					char tmp[MAXPATHLEN];
1348 					if ( !getcwd(tmp, MAXPATHLEN-1) ) {
1349 						fprintf(stderr, "Failed to get current working directory: %s\n", strerror(errno));
1350 						exit(255);
1351 					}
1352 					tmp[MAXPATHLEN-1] = 0;
1353 					if ( (strlen(tmp) + strlen(optarg) + 3) < MAXPATHLEN ) {
1354 						snprintf(pidfile, MAXPATHLEN - 3 - strlen(tmp), "%s/%s", tmp, optarg);
1355 					} else {
1356 						fprintf(stderr, "pidfile MAXPATHLEN error:\n");
1357 						exit(255);
1358 					}
1359 				}
1360 				// pidfile now absolute path
1361 				pidfile[MAXPATHLEN-1] = 0;
1362 				break;
1363 			case 'S':
1364 				subdir_index = atoi(optarg);
1365 				break;
1366 			case 'T': {
1367 				size_t len = strlen(optarg);
1368 				extension_tags = optarg;
1369 				if ( len == 0 || len > 128 ) {
1370 					fprintf(stderr, "Extension length error. Unexpected option '%s'\n", extension_tags);
1371 					exit(255);
1372 				}
1373 				break; }
1374 			case 'E':
1375 				verbose = 1;
1376 				Setv6Mode(1);
1377 				break;
1378 			case 'V':
1379 				printf("%s: Version: %s\n",argv[0], nfdump_version);
1380 				exit(0);
1381 				break;
1382 			default:
1383 				usage(argv[0]);
1384 				exit(EXIT_FAILURE);
1385 		}
1386 	}
1387 
1388 	if (argc - optind > 1) {
1389 		usage(argv[0]);
1390 		exit(EXIT_FAILURE);
1391 	} else {
1392 		/* user specified a pcap filter */
1393 		filter = argv[optind];
1394 	}
1395 
1396 	if ( fs == NULL && datadir != NULL && !AddDefaultFlowSource(&fs, Ident, datadir) ) {
1397 		fprintf(stderr, "Failed to add default data collector directory\n");
1398 		exit(255);
1399 	}
1400 
1401 	if ( device && pcapfile ) {
1402 		LogError("Specify either a device or a pcapfile, but not both");
1403 		exit(EXIT_FAILURE);
1404 	}
1405 
1406 	if ( !device && !pcapfile ) {
1407 		LogError("Specify either a device or a pcapfile to read packets from");
1408 		exit(EXIT_FAILURE);
1409 	}
1410 
1411 	if ( !Init_FlowTree(cache_size, active, inactive)) {
1412 		LogError("Init_FlowTree() failed.");
1413 		exit(EXIT_FAILURE);
1414 	}
1415 
1416 	InitExtensionMaps(NO_EXTENSION_LIST);
1417 	SetupExtensionDescriptors(strdup(extension_tags));
1418 
1419 	if ( pcapfile ) {
1420 		pcap_dev = setup_pcap_file(pcapfile, filter, snaplen);
1421 	} else {
1422 		pcap_dev = setup_pcap_live(device, filter, snaplen);
1423 	}
1424 	if (!pcap_dev) {
1425 		exit(EXIT_FAILURE);
1426 	}
1427 
1428 	SetPriv(userid, groupid);
1429 
1430 	if ( subdir_index && !InitHierPath(subdir_index) ) {
1431 		pcap_close(pcap_dev->handle);
1432 		exit(255);
1433 	}
1434 
1435 	if ( !InitLog(do_daemonize, argv[0], SYSLOG_FACILITY, verbose) ) {
1436 		pcap_close(pcap_dev->handle);
1437 		exit(255);
1438 	}
1439 
1440 	// check if pid file exists and if so, if a process with registered pid is running
1441 	if ( strlen(pidfile) ) {
1442 		int pidf;
1443 		pidf = open(pidfile, O_RDONLY, 0);
1444 		if ( pidf > 0 ) {
1445 			// pid file exists
1446 			char s[32];
1447 			ssize_t len;
1448 			len = read(pidf, (void *)s, 31);
1449 			close(pidf);
1450 			s[31] = '\0';
1451 			if ( len < 0 ) {
1452 				fprintf(stderr, "read() error existing pid file: %s\n", strerror(errno));
1453 				pcap_close(pcap_dev->handle);
1454 				exit(255);
1455 			} else {
1456 				unsigned long pid = atol(s);
1457 				if ( pid == 0 ) {
1458 					// garbage - use this file
1459 					unlink(pidfile);
1460 				} else {
1461 					if ( kill(pid, 0) == 0 ) {
1462 						// process exists
1463 						fprintf(stderr, "A process with pid %lu registered in pidfile %s is already running!\n",
1464 							pid, strerror(errno));
1465 						pcap_close(pcap_dev->handle);
1466 						exit(255);
1467 					} else {
1468 						// no such process - use this file
1469 						unlink(pidfile);
1470 					}
1471 				}
1472 			}
1473 		} else {
1474 			if ( errno != ENOENT ) {
1475 				fprintf(stderr, "open() error existing pid file: %s\n", strerror(errno));
1476 				pcap_close(pcap_dev->handle);
1477 				exit(255);
1478 			} // else errno == ENOENT - no file - this is fine
1479 		}
1480 	}
1481 
1482 	if ( do_daemonize ) {
1483 		verbose = 0;
1484 		daemonize();
1485 	}
1486 
1487 	if (strlen(pidfile)) {
1488 		pid_t pid = getpid();
1489 		int pidf  = open(pidfile, O_RDWR|O_TRUNC|O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
1490 		if ( pidf == -1 ) {
1491 			LogError("Error opening pid file: '%s' %s", pidfile, strerror(errno));
1492 			pcap_close(pcap_dev->handle);
1493 			exit(255);
1494 		}
1495 		snprintf(pidstr,31,"%lu\n", (unsigned long)pid);
1496 		if ( write(pidf, pidstr, strlen(pidstr)) <= 0 ) {
1497 			LogError("Error write pid file: '%s' %s", pidfile, strerror(errno));
1498 		}
1499 		close(pidf);
1500 	}
1501 
1502 	if ( InitBookkeeper(&fs->bookkeeper, fs->datadir, getpid(), launcher_pid) != BOOKKEEPER_OK ) {
1503 			LogError("initialize bookkeeper failed.");
1504 			pcap_close(pcap_dev->handle);
1505 			exit(255);
1506 	}
1507 
1508 	// Init the extension map list
1509 	if ( !InitExtensionMapList(fs) ) {
1510 		pcap_close(pcap_dev->handle);
1511 		exit(255);
1512 	}
1513 
1514 	IPFragTree_init();
1515 
1516 	LogInfo("Startup.");
1517 	// prepare signal mask for all threads
1518 	// block signals, as they are handled by the main thread
1519 	// mask is inherited by all threads
1520 	sigemptyset(&signal_set);
1521 	sigaddset(&signal_set, SIGINT);
1522 	sigaddset(&signal_set, SIGHUP);
1523 	sigaddset(&signal_set, SIGTERM);
1524 	sigaddset(&signal_set, SIGUSR1);
1525 	sigaddset(&signal_set, SIGPIPE);
1526 	pthread_sigmask(SIG_BLOCK, &signal_set, NULL);
1527 
1528 	// for USR2 set a signal handler, which interrupts blocking
1529 	// system calls - and signals done event
1530 	// handler applies for all threads in a process
1531 	sa.sa_handler = Interrupt_handler;
1532 	sigemptyset(&sa.sa_mask);
1533 	sa.sa_flags = 0;
1534 	sigaction(SIGPIPE, &sa, NULL);
1535 	sigaction(SIGUSR2, &sa, NULL);
1536 
1537 	// key for each thread
1538 	err = pthread_key_create(&buffer_key, NULL);
1539 	if ( err ) {
1540 		LogError("pthread_key() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno) );
1541 		exit(255);
1542 	}
1543 
1544 	// prepare flow thread args
1545 	p_flow_thread_args = (p_flow_thread_args_t *)malloc(sizeof(p_flow_thread_args_t));
1546 	if ( !p_flow_thread_args ) {
1547 		LogError("malloc() error in %s line %d: %s\n",
1548 			__FILE__, __LINE__, strerror(errno) );
1549 		exit(255);
1550 	}
1551 	p_flow_thread_args->fs			   = fs;
1552 	p_flow_thread_args->t_win		   = t_win;
1553 	p_flow_thread_args->compress	   = compress;
1554 	p_flow_thread_args->live		   = device != NULL;
1555 	p_flow_thread_args->subdir_index   = subdir_index;
1556 	p_flow_thread_args->parent		   = pthread_self();
1557 	p_flow_thread_args->NodeList	   = NewNodeList();
1558 	p_flow_thread_args->time_extension = time_extension;
1559 
1560 	err = 0;
1561 
1562 	err = pthread_create(&p_flow_thread_args->tid, NULL, p_flow_thread, (void *)p_flow_thread_args);
1563 	if ( err ) {
1564 		LogError("pthread_create() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno) );
1565 		exit(255);
1566 	}
1567 	dbg_printf("Started flow thread[%lu]\n", (long unsigned)p_flow_thread_args->tid);
1568 
1569 	// prepare packet thread args
1570 	p_packet_thread_args = (p_packet_thread_args_t *)malloc(sizeof(p_packet_thread_args_t));
1571 	if ( !p_packet_thread_args ) {
1572 		LogError("malloc() error in %s line %d: %s\n",
1573 			__FILE__, __LINE__, strerror(errno) );
1574 		exit(255);
1575 	}
1576 	p_packet_thread_args->pcap_dev		 = pcap_dev;
1577 	p_packet_thread_args->t_win			 = t_win;
1578 	p_packet_thread_args->subdir_index	 = subdir_index;
1579 	p_packet_thread_args->pcap_datadir	 = pcap_datadir;
1580 	p_packet_thread_args->live			 = device != NULL;
1581 	p_packet_thread_args->parent		 = pthread_self();
1582 	p_packet_thread_args->NodeList		 = p_flow_thread_args->NodeList;
1583 	p_packet_thread_args->time_extension = p_flow_thread_args->time_extension;
1584 
1585 	err = pthread_create(&p_packet_thread_args->tid, NULL, p_packet_thread, (void *)p_packet_thread_args);
1586 	if ( err ) {
1587 		LogError("pthread_create() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno) );
1588 		exit(255);
1589 	}
1590 	dbg_printf("Started packet thread[%lu]\n", (long unsigned)p_packet_thread_args->tid);
1591 
1592 	// Wait till done
1593 	WaitDone();
1594 
1595 	dbg_printf("Signal packet thread to terminate\n");
1596 	SignalThreadTerminate((thread_info_t *)p_packet_thread_args, NULL);
1597 
1598 	dbg_printf("Signal flow thread to terminate\n");
1599 	SignalThreadTerminate((thread_info_t *)p_flow_thread_args, &p_packet_thread_args->NodeList->c_list);
1600 
1601 	IPFragTree_free();
1602 
1603 	// free arg list
1604 	free((void *)p_packet_thread_args);
1605 	free((void *)p_flow_thread_args);
1606 
1607 	LogInfo("Terminating nfpcapd.");
1608 
1609 	if ( expire == 0 && ReadStatInfo(fs->datadir, &dirstat, LOCK_IF_EXISTS) == STATFILE_OK ) {
1610 		UpdateBookStat(dirstat, fs->bookkeeper);
1611 		WriteStatInfo(dirstat);
1612 		LogInfo("Updating statinfo in directory '%s'", datadir);
1613 	}
1614 
1615 	ReleaseBookkeeper(fs->bookkeeper, DESTROY_BOOKKEEPER);
1616 	pcap_close(pcap_dev->handle);
1617 
1618 	if ( strlen(pidfile) )
1619 		unlink(pidfile);
1620 
1621 	EndLog();
1622 
1623 	exit(EXIT_SUCCESS);
1624 } /* End of main */
1625 
1626