1 /*
2 * Copyright (c) 2013-2020, Peter Haag
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 * * Neither the name of the author nor the names of its contributors may be
14 * used to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 *
29 */
30
31 #include "config.h"
32
33 #ifdef HAVE_FEATURES_H
34 #include <features.h>
35 #endif
36
37 #include <stdlib.h>
38 #include <unistd.h>
39 #include <sys/types.h>
40 #include <sys/wait.h>
41 #include <sys/time.h>
42 #include <sys/stat.h>
43 #include <stdio.h>
44 #include <stdarg.h>
45 #include <pwd.h>
46 #include <grp.h>
47 #include <time.h>
48 #include <fcntl.h>
49 #include <sys/stat.h>
50 #include <sys/param.h>
51 #include <sys/socket.h>
52 #include <pthread.h>
53 #include <errno.h>
54 #include <signal.h>
55 #include <string.h>
56 #include <assert.h>
57 // #include <mcheck.h>
58
59 #ifdef HAVE_STDINT_H
60 #include <stdint.h>
61 #endif
62
63 #include <netinet/in.h>
64 #include <netinet/in_systm.h>
65 #include <netinet/ip.h>
66 #include <netinet/ip6.h>
67 #include <netinet/tcp.h>
68 #include <netinet/udp.h>
69 #include <netinet/ip_icmp.h>
70 #include <arpa/inet.h>
71
72
73 #ifdef HAVE_NET_BPF_H
74 # include <net/bpf.h>
75 #else
76 #ifdef HAVE_PCAP_BPF_H
77 # include <pcap/bpf.h>
78 #else
79 # error missing bpf header
80 #endif
81 #endif
82
83 #include <pcap.h>
84
85 #include "util.h"
86 #include "nfdump.h"
87 #include "nffile.h"
88 #include "nfx.h"
89 #include "expire.h"
90 #include "nfnet.h"
91 #include "flist.h"
92 #include "nfstatfile.h"
93 #include "bookkeeper.h"
94 #include "collector.h"
95 #include "exporter.h"
96 #include "ipfrag.h"
97 #include "flowtree.h"
98 #include "netflow_pcap.h"
99 #include "pcaproc.h"
100
101 #define TIME_WINDOW 300
102 #define PROMISC 1
103 #define TIMEOUT 500
104 #define FILTER ""
105 #define DEFAULT_DIR "/var/tmp"
106
107 #ifndef DLT_LINUX_SLL
108 #define DLT_LINUX_SLL 113
109 #endif
110
111 #define EXPIREINTERVALL 10
112
113 int verbose = 0;
114
115 /*
116 * global static var: used by interrupt routine
117 */
118 #define PCAP_DUMPFILE "pcap.current"
119
120 static const char *nfdump_version = VERSION;
121
122 static int launcher_pid;
123 static pthread_mutex_t m_done = PTHREAD_MUTEX_INITIALIZER;
124 static pthread_cond_t terminate = PTHREAD_COND_INITIALIZER;
125 static pthread_key_t buffer_key;
126
127 uint32_t linktype;
128 uint32_t linkoffset;
129
130 // Common thread info struct
131 typedef struct thread_info_s {
132 pthread_t tid;
133 int done;
134 int exit;
135 } thread_info_t;
136
137 typedef struct p_pcap_flush_thread_args_s {
138 // common thread info struct
139 pthread_t tid;
140 int done;
141 int exit;
142
143 // the parent
144 pthread_t parent;
145
146 // arguments
147 int subdir_index;
148 char *pcap_datadir;
149 char *time_extension;
150 pcap_dev_t *pcap_dev;
151 pcapfile_t *pcapfile;
152 } p_pcap_flush_thread_args_t;
153
154 typedef struct p_packet_thread_args_s {
155 // common thread info struct
156 pthread_t tid;
157 int done;
158 int exit;
159
160 // the parent
161 pthread_t parent;
162
163 // arguments
164 NodeList_t *NodeList; // push new nodes into this list
165 pcap_dev_t *pcap_dev;
166 time_t t_win;
167 int subdir_index;
168 char *pcap_datadir;
169 char *time_extension;
170 int live;
171 } p_packet_thread_args_t;
172
173 typedef struct p_flow_thread_args_s {
174 // common thread info struct
175 pthread_t tid;
176 int done;
177 int exit;
178 // the parent
179 pthread_t parent;
180
181 // arguments
182 NodeList_t *NodeList; // pop new nodes from this list
183 FlowSource_t *fs;
184 time_t t_win;
185 char *time_extension;
186 int subdir_index;
187 int compress;
188 int live;
189 } p_flow_thread_args_t;
190
191 /*
192 * Function prototypes
193 */
194 static void usage(char *name);
195
196 static void daemonize(void);
197
198 static void Interrupt_handler(int sig);
199
200 static void SetPriv(char *userid, char *groupid );
201
202 static pcap_dev_t *setup_pcap_live(char *device, char *filter, int snaplen);
203
204 static pcap_dev_t *setup_pcap_Ffile(FILE *fp, char *filter, int snaplen);
205
206 static pcap_dev_t *setup_pcap_file(char *pcap_file, char *filter, int snaplen);
207
208 static void WaitDone(void);
209
210 static void SignalThreadTerminate(thread_info_t *thread_info, pthread_cond_t *thread_cond );
211
212 static void *p_pcap_flush_thread(void *thread_data);
213
214 static void *p_flow_thread(void *thread_data);
215
216 static void *p_packet_thread(void *thread_data);
217
218 /*
219 * Functions
220 */
221
usage(char * name)222 static void usage(char *name) {
223 printf("usage %s [options] [\"pcap filter\"]\n"
224 "-h\t\tthis text you see right here\n"
225 "-u userid\tChange user to username\n"
226 "-g groupid\tChange group to groupname\n"
227 "-i interface\tread packets from interface\n"
228 "-r pcapfile\tread packets from file\n"
229 "-B num\tset the node cache size. (default 524288)\n"
230 "-s snaplen\tset the snapshot length - default 1526\n"
231 "-e active,inactive\tset the active,inactive flow expire time (s) - default 300,60\n"
232 "-l flowdir \tset the flow output directory. (no default) \n"
233 "-p pcapdir \tset the pcapdir directory. (optional) \n"
234 "-S subdir\tSub directory format. see nfcapd(1) for format\n"
235 "-I Ident\tset the ident string for stat file. (default 'none')\n"
236 "-P pidfile\tset the PID file\n"
237 "-t time frame\tset the time window to rotate pcap/nfcapd file\n"
238 "-z\t\tLZO compress flows in output file.\n"
239 "-y\t\tLZ4 compress flows in output file.\n"
240 "-j\t\tBZ2 compress flows in output file.\n"
241 "-E\t\tPrint extended format of netflow data. for debugging purpose only.\n"
242 "-T\t\tInclude extension tags in records.\n"
243 "-D\t\tdetach from terminal (daemonize)\n"
244 , name);
245 } // End of usage
246
Interrupt_handler(int sig)247 static void Interrupt_handler(int sig) {
248 pthread_t tid = pthread_self();
249 thread_info_t *thread_info;
250
251 thread_info = (thread_info_t *)pthread_getspecific(buffer_key);
252 if ( !thread_info ) {
253 LogError("[%lu] Interrupt_handler() failed to get thread specific data block", (long unsigned)tid);
254 } else {
255 if ( thread_info->tid != tid ) {
256 LogError("[%lu] Interrupt_handler() missmatch tid in thread_info", (long unsigned)tid);
257 } else {
258 thread_info->done = 1;
259 }
260 }
261
262 } // End of signal_handler
263
daemonize(void)264 static void daemonize(void) {
265 int fd;
266 switch (fork()) {
267 case 0:
268 // child
269 break;
270 case -1:
271 // error
272 fprintf(stderr, "fork() error: %s\n", strerror(errno));
273 exit(0);
274 break;
275 default:
276 // parent
277 _exit(0);
278 }
279
280 if (setsid() < 0) {
281 fprintf(stderr, "setsid() error: %s\n", strerror(errno));
282 exit(0);
283 }
284
285 // Double fork
286 switch (fork()) {
287 case 0:
288 // child
289 break;
290 case -1:
291 // error
292 fprintf(stderr, "fork() error: %s\n", strerror(errno));
293 exit(0);
294 break;
295 default:
296 // parent
297 _exit(0);
298 }
299
300 fd = open("/dev/null", O_RDONLY);
301 if (fd != 0) {
302 dup2(fd, 0);
303 close(fd);
304 }
305 fd = open("/dev/null", O_WRONLY);
306 if (fd != 1) {
307 dup2(fd, 1);
308 close(fd);
309 }
310 fd = open("/dev/null", O_WRONLY);
311 if (fd != 2) {
312 dup2(fd, 2);
313 close(fd);
314 }
315
316 } // End of daemonize
317
SetPriv(char * userid,char * groupid)318 static void SetPriv(char *userid, char *groupid ) {
319 struct passwd *pw_entry;
320 struct group *gr_entry;
321 uid_t myuid, newuid, newgid;
322 int err;
323
324 if ( userid == 0 && groupid == 0 )
325 return;
326
327 newuid = newgid = 0;
328 myuid = getuid();
329 if ( myuid != 0 ) {
330 LogError("Only root wants to change uid/gid");
331 fprintf(stderr, "ERROR: Only root wants to change uid/gid\n");
332 exit(255);
333 }
334
335 if ( userid ) {
336 pw_entry = getpwnam(userid);
337 newuid = pw_entry ? pw_entry->pw_uid : atol(userid);
338
339 if ( newuid == 0 ) {
340 fprintf (stderr,"Invalid user '%s'\n", userid);
341 exit(255);
342 }
343 }
344
345 if ( groupid ) {
346 gr_entry = getgrnam(groupid);
347 newgid = gr_entry ? gr_entry->gr_gid : atol(groupid);
348
349 if ( newgid == 0 ) {
350 fprintf (stderr,"Invalid group '%s'\n", groupid);
351 exit(255);
352 }
353
354 err = setgid(newgid);
355 if ( err ) {
356 LogError("Can't set group id %ld for group '%s': %s", (long)newgid, groupid, strerror(errno));
357 fprintf (stderr,"Can't set group id %ld for group '%s': %s\n", (long)newgid, groupid, strerror(errno));
358 exit(255);
359 }
360
361 }
362
363 if ( newuid ) {
364 err = setuid(newuid);
365 if ( err ) {
366 LogError("Can't set user id %ld for user '%s': %s", (long)newuid, userid, strerror(errno));
367 fprintf (stderr,"Can't set user id %ld for user '%s': %s\n", (long)newuid, userid, strerror(errno));
368 exit(255);
369 }
370 }
371
372 } // End of SetPriv
373
setup_pcap_live(char * device,char * filter,int snaplen)374 static pcap_dev_t *setup_pcap_live(char *device, char *filter, int snaplen) {
375 pcap_t *handle = NULL;
376 pcap_dev_t *pcap_dev = NULL;
377 pcap_if_t *alldevsp = NULL;
378 char errbuf[PCAP_ERRBUF_SIZE];
379 bpf_u_int32 mask; /* Our netmask */
380 bpf_u_int32 net; /* Our IP */
381 struct bpf_program filter_code;
382 uint32_t linkoffset, linktype;
383
384 dbg_printf("Enter function: %s\n", __FUNCTION__);
385
386 if (device == NULL) {
387 if ( pcap_findalldevs(&alldevsp, errbuf) == -1 ) {
388 LogError("pcap_findalldevs() error: %s in %s line %d",
389 errbuf, __FILE__, __LINE__);
390 return NULL;
391 }
392 if ( alldevsp == NULL ) {
393 LogError("Couldn't find default device");
394 return NULL;
395 }
396 device = alldevsp[0].name;
397 LogInfo("Listen on %s", device);
398 }
399
400 /* Find the properties for the device */
401 if (pcap_lookupnet(device, &net, &mask, errbuf) == -1) {
402 LogError("Couldn't get netmask for device %s: %s", device, errbuf);
403 net = 0;
404 mask = 0;
405 }
406
407 /*
408 * Open the packet capturing device with the following values:
409 *
410 * PROMISC: on
411 * The interface needs to be in promiscuous mode to capture all
412 * network traffic on the localnet.
413 * TIMEOUT: 500ms
414 * A 500 ms timeout is probably fine for most networks. For
415 * architectures that support it, you might want tune this value
416 * depending on how much traffic you're seeing on the network.
417 */
418 handle = pcap_open_live(device, snaplen, PROMISC, TIMEOUT, errbuf);
419 if (handle == NULL) {
420 LogError("Couldn't open device %s: %s", device, errbuf);
421 return NULL;
422 }
423
424 // XXX
425 // int pcap_set_buffer_size(pcap_t *p, int buffer_size);
426
427 if ( filter ) {
428 /* Compile and apply the filter */
429 if (pcap_compile(handle, &filter_code, filter, 0, net) == -1) {
430 LogError("Couldn't parse filter %s: %s", filter, pcap_geterr(handle));
431 return NULL;
432 }
433 if (pcap_setfilter(handle, &filter_code) == -1) {
434 LogError("Couldn't install filter %s: %s", filter, pcap_geterr(handle));
435 return NULL;
436 }
437 }
438
439 pcap_dev = (pcap_dev_t *)calloc(1, sizeof(pcap_dev_t));
440 if ( !pcap_dev ) {
441 LogError("malloc() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno) );
442 return NULL;
443 }
444
445 linkoffset = 0;
446 linktype = pcap_datalink(handle);
447 switch ( linktype ) {
448 case DLT_RAW:
449 linkoffset = 0;
450 break;
451 case DLT_PPP:
452 linkoffset = 2;
453 break;
454 case DLT_NULL:
455 linkoffset = 4;
456 break;
457 case DLT_LOOP:
458 linkoffset = 14;
459 break;
460 case DLT_EN10MB:
461 linkoffset = 14;
462 break;
463 case DLT_LINUX_SLL:
464 linkoffset = 16;
465 break;
466 case DLT_IEEE802_11:
467 linkoffset = 22;
468 break;
469 default:
470 LogError("Unsupported data link type %i", linktype);
471 return NULL;
472 }
473
474 pcap_dev->handle = handle;
475 pcap_dev->snaplen = snaplen;
476 pcap_dev->linkoffset = linkoffset;
477 pcap_dev->linktype = linktype;
478
479 return pcap_dev;
480
481 } // End of setup_pcap_live
482
setup_pcap_file(char * pcap_file,char * filter,int snaplen)483 static pcap_dev_t *setup_pcap_file(char *pcap_file, char *filter, int snaplen) {
484 FILE *fp;
485
486 fp = fopen(pcap_file, "rb");
487 if ( !fp ) {
488 LogError("Couldn't open file: %s: %s", pcap_file, strerror(errno));
489 return NULL;
490 }
491 return setup_pcap_Ffile(fp, filter, snaplen);
492
493 } // End of setup_pcap_file
494
setup_pcap_Ffile(FILE * fp,char * filter,int snaplen)495 static pcap_dev_t *setup_pcap_Ffile(FILE *fp, char *filter, int snaplen) {
496 pcap_t *handle;
497 pcap_dev_t *pcap_dev;
498 char errbuf[PCAP_ERRBUF_SIZE];
499 uint32_t linkoffset, linktype;
500
501 dbg_printf("Enter function: %s\n", __FUNCTION__);
502
503 if ( !fp )
504 return NULL;
505
506 handle = pcap_fopen_offline(fp, errbuf);
507 if (handle == NULL) {
508 LogError("Couldn't attach FILE handle %s", errbuf);
509 return NULL;
510 }
511
512 if ( filter ) {
513 struct bpf_program filter_code;
514 bpf_u_int32 netmask = 0;
515 // Compile and apply the filter
516 if (pcap_compile(handle, &filter_code, filter, 0, netmask) == -1) {
517 LogError("Couldn't parse filter %s: %s", filter, pcap_geterr(handle));
518 return NULL;
519 }
520 if (pcap_setfilter(handle, &filter_code) == -1) {
521 LogError("Couldn't install filter %s: %s", filter, pcap_geterr(handle));
522 return NULL;
523 }
524 }
525
526 linkoffset = 0;
527 linktype = pcap_datalink(handle);
528 switch ( linktype ) {
529 case DLT_RAW:
530 linkoffset = 0;
531 break;
532 case DLT_PPP:
533 linkoffset = 2;
534 break;
535 case DLT_NULL:
536 linkoffset = 4;
537 break;
538 case DLT_LOOP:
539 linkoffset = 14;
540 break;
541 case DLT_EN10MB:
542 linkoffset = 14;
543 break;
544 case DLT_LINUX_SLL:
545 linkoffset = 16;
546 break;
547 case DLT_IEEE802_11:
548 linkoffset = 22;
549 break;
550 default:
551 LogError("Unsupported data link type %i", linktype);
552 return NULL;
553 }
554
555 pcap_dev = (pcap_dev_t *)calloc(1, sizeof(pcap_dev_t));
556 if ( !pcap_dev ) {
557 LogError("malloc() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno) );
558 return NULL;
559 }
560
561 pcap_dev->handle = handle;
562 pcap_dev->snaplen = snaplen;
563 pcap_dev->linkoffset = linkoffset;
564 pcap_dev->linktype = linktype;
565
566 return pcap_dev;
567
568 } // End of setup_pcap_file
569
SignalThreadTerminate(thread_info_t * thread_info,pthread_cond_t * thread_cond)570 static void SignalThreadTerminate(thread_info_t *thread_info, pthread_cond_t *thread_cond ) {
571
572 if ( !thread_info->done ) {
573 dbg_printf("Signal thread[%lu] to terminate\n", (long unsigned)thread_info->tid);
574 if ( pthread_kill(thread_info->tid, SIGUSR2) != 0 ) {
575 dbg_printf("Failed to signal thread[%lu]\n", (long unsigned)thread_info->tid);
576 }
577
578 // in case of a condition - signal condition
579 if ( thread_cond )
580 pthread_cond_signal(thread_cond);
581
582 } else {
583 dbg_printf("thread[%lu] gone already\n", (long unsigned)thread_info->tid);
584 }
585
586 if( pthread_join(thread_info->tid, NULL) == 0 ) {
587 dbg_printf("thread %lu joined\n", (long unsigned)thread_info->tid);
588 } else {
589 dbg_printf("thread %lu no join\n", (long unsigned)thread_info->tid);
590 }
591
592 LogInfo("Exit status thread[%lu]: %i", thread_info->tid, thread_info->exit);
593
594 } // End of SignalThreadEnd
595
p_flow_thread(void * thread_data)596 __attribute__((noreturn)) static void *p_flow_thread(void *thread_data) {
597 // argument dispatching
598 p_flow_thread_args_t *args = (p_flow_thread_args_t *)thread_data;
599 time_t t_win = args->t_win;
600 int subdir_index = args->subdir_index;
601 int compress = args->compress;
602 int live = args->live;
603 FlowSource_t *fs = args->fs;
604 char *time_extension = args->time_extension;
605 static time_t lastExpire = 0;
606 time_t t_start, t_clock;
607 int err, done;
608
609 done = 0;
610 args->done = 0;
611 args->exit = 0;
612
613 err = pthread_setspecific( buffer_key, (void *)args );
614 if ( err ) {
615 LogError("[%lu] pthread_setspecific() error in %s line %d: %s\n",
616 (long unsigned)args->tid, __FILE__, __LINE__, strerror(errno) );
617 args->done = 1;
618 args->exit = 255;
619 pthread_kill(args->parent, SIGUSR1);
620 pthread_exit((void *)args);
621 }
622
623 if ( !Init_pcap2nf() ) {
624 args->done = 1;
625 args->exit = 255;
626 pthread_kill(args->parent, SIGUSR1);
627 pthread_exit((void *)args);
628 }
629
630 // prepare file
631 fs->nffile = OpenNewFile(fs->current, NULL, compress, 0, NULL);
632 if ( !fs->nffile ) {
633 args->done = 1;
634 args->exit = 255;
635 pthread_kill(args->parent, SIGUSR1);
636 pthread_exit((void *)args);
637 }
638
639 // init vars
640 fs->bad_packets = 0;
641 fs->first_seen = 0xffffffffffffLL;
642 fs->last_seen = 0;
643
644 t_start = 0;
645 t_clock = 0;
646 while ( 1 ) {
647 struct FlowNode *Node;
648
649 Node = Pop_Node(args->NodeList, &args->done);
650 if ( Node ) {
651 t_clock = Node->t_last.tv_sec;
652 dbg_printf("p_flow_thread() Next Node\n");
653 } else {
654 done = args->done;
655 dbg_printf("p_flow_thread() NULL Node\n");
656 }
657
658 if ( t_start == 0 ) {
659 t_start = t_clock - (t_clock % t_win);
660 }
661
662 if (((t_clock - t_start) >= t_win) || done) { /* rotate file */
663 struct tm *when;
664 nffile_t *nffile;
665 char FullName[MAXPATHLEN];
666 char netflowFname[128];
667 char error[256];
668 char *subdir, fmt[24];
669 uint32_t NumFlows;
670
671 // flush all flows to disk
672 DumpNodeStat(args->NodeList);
673 if (done)
674 NumFlows = Flush_FlowTree(fs);
675 else
676 NumFlows = Expire_FlowTree(fs, t_clock);
677
678 when = localtime(&t_start);
679 strftime(fmt, sizeof(fmt), time_extension, when);
680
681 nffile = fs->nffile;
682
683 // prepare sub dir hierarchy
684 if ( subdir_index ) {
685 subdir = GetSubDir(when);
686 if ( !subdir ) {
687 // failed to generate subdir path - put flows into base directory
688 LogError("Failed to create subdir path!");
689
690 // failed to generate subdir path - put flows into base directory
691 subdir = NULL;
692 snprintf(netflowFname, 127, "nfcapd.%s", fmt);
693 } else {
694 snprintf(netflowFname, 127, "%s/nfcapd.%s", subdir, fmt);
695 }
696
697 } else {
698 subdir = NULL;
699 snprintf(netflowFname, 127, "nfcapd.%s", fmt);
700 }
701 netflowFname[127] = '\0';
702
703 if ( subdir && !SetupSubDir(fs->datadir, subdir, error, 255) ) {
704 // in this case the flows get lost! - the rename will fail
705 // but this should not happen anyway, unless i/o problems, inode problems etc.
706 LogError("Ident: %s, Failed to create sub hier directories: %s", fs->Ident, error );
707 }
708
709 if ( nffile->block_header->NumRecords ) {
710 // flush current buffer to disc
711 if ( WriteBlock(nffile) <= 0 )
712 LogError("Ident: %s, failed to write output buffer to disk: '%s'" , fs->Ident, strerror(errno));
713 } // else - no new records in current block
714
715 // prepare full filename
716 snprintf(FullName, MAXPATHLEN-1, "%s/%s", fs->datadir, netflowFname);
717 FullName[MAXPATHLEN-1] = '\0';
718
719 // update stat record
720 // if no flows were collected, fs->last_seen is still 0
721 // set first_seen to start of this time slot, with twin window size.
722 if ( fs->last_seen == 0 ) {
723 fs->first_seen = (uint64_t)1000 * (uint64_t)t_start;
724 fs->last_seen = (uint64_t)1000 * (uint64_t)(t_start + t_win);
725 }
726 nffile->stat_record->first_seen = fs->first_seen/1000;
727 nffile->stat_record->msec_first = fs->first_seen - nffile->stat_record->first_seen*1000;
728 nffile->stat_record->last_seen = fs->last_seen/1000;
729 nffile->stat_record->msec_last = fs->last_seen - nffile->stat_record->last_seen*1000;
730
731 // Flush Exporter Stat to file
732 FlushExporterStats(fs);
733 // Close file
734 CloseUpdateFile(nffile, fs->Ident);
735
736 // if rename fails, we are in big trouble, as we need to get rid of the old .current file
737 // otherwise, we will loose flows and can not continue collecting new flows
738 if ( !RenameAppend(fs->current, FullName) ) {
739 LogError("Ident: %s, Can't rename dump file: %s", fs->Ident, strerror(errno));
740 LogError("Ident: %s, Serious Problem! Fix manually", fs->Ident);
741 /* XXX
742 if ( launcher_pid )
743 commbuff->failed = 1;
744 */
745 // we do not update the books here, as the file failed to rename properly
746 // otherwise the books may be wrong
747 } else {
748 struct stat fstat;
749 /* XXX
750 if ( launcher_pid )
751 commbuff->failed = 0;
752 */
753 // Update books
754 stat(FullName, &fstat);
755 UpdateBooks(fs->bookkeeper, t_start, 512*fstat.st_blocks);
756 }
757
758 LogInfo("Ident: '%s' Flows: %llu, Packets: %llu, Bytes: %llu, Max Flows: %u, Fragments: %u",
759 fs->Ident, (unsigned long long)nffile->stat_record->numflows, (unsigned long long)nffile->stat_record->numpackets,
760 (unsigned long long)nffile->stat_record->numbytes, NumFlows, IPFragEntries());
761
762 // reset stats
763 fs->bad_packets = 0;
764 fs->first_seen = 0xffffffffffffLL;
765 fs->last_seen = 0;
766
767 // Dump all extension maps and exporters to the buffer
768 FlushStdRecords(fs);
769
770 if ( done )
771 break;
772
773 t_start = t_clock - (t_clock % t_win);
774
775 nffile = OpenNewFile(fs->current, nffile, compress, 0, NULL);
776 if ( !nffile ) {
777 LogError("Fatal: OpenNewFile() failed for ident: %s", fs->Ident);
778 args->done = 1;
779 args->exit = 255;
780 pthread_kill(args->parent, SIGUSR1);
781 break;
782 }
783 }
784
785 time_t when;
786 if ( Node ) {
787 if ( Node->fin != SIGNAL_NODE ) {
788 // Process the Node
789 ProcessFlowNode(fs, Node);
790 }
791 when = Node->t_last.tv_sec;
792 } else {
793 when = time(NULL);
794 }
795 if ( (when - lastExpire) > EXPIREINTERVALL ) {
796 Expire_FlowTree(fs, when);
797 lastExpire = when;
798 }
799 CacheCheck(fs, when, live);
800
801 }
802
803 while ( fs ) {
804 DisposeFile(fs->nffile);
805 fs = fs->next;
806 }
807 LogInfo("Terminating flow processng: exit: %i", args->exit);
808 dbg_printf("End flow thread[%lu]\n", (long unsigned)args->tid);
809
810 pthread_exit((void *)args);
811 /* NOTREACHED */
812
813 } // End of p_flow_thread
814
p_pcap_flush_thread(void * thread_data)815 __attribute__((noreturn)) static void *p_pcap_flush_thread(void *thread_data) {
816 // argument dispatching
817 p_pcap_flush_thread_args_t *args = (p_pcap_flush_thread_args_t *)thread_data;
818 char *pcap_datadir = args->pcap_datadir;
819 pcap_dev_t *pcap_dev = args->pcap_dev;
820 pcapfile_t *pcapfile = args->pcapfile;
821 char *time_extension = args->time_extension;
822 char pcap_dumpfile[MAXPATHLEN];
823 int err;
824 int runs = 0;
825
826 dbg_printf("New flush thread[%lu]\n", (long unsigned)args->tid);
827 args->done = 0;
828 args->exit = 0;
829
830 err = pthread_setspecific( buffer_key, (void *)args );
831 if ( err ) {
832 LogError("[%lu] pthread_setspecific() error in %s line %d: %s\n",
833 (long unsigned)args->tid, __FILE__, __LINE__, strerror(errno) );
834 args->done = 1;
835 args->exit = 255;
836 pthread_kill(args->parent, SIGUSR1);
837 pthread_exit((void *)args);
838 }
839
840 snprintf(pcap_dumpfile, MAXPATHLEN-1, "%s/%s.%lu", pcap_datadir , PCAP_DUMPFILE, (unsigned long)getpid() );
841 pcapfile = OpenNewPcapFile(pcap_dev->handle, pcap_dumpfile, pcapfile);
842 if ( !pcapfile ) {
843 args->done = 1;
844 args->exit = 255;
845 pthread_kill(args->parent, SIGUSR1);
846 pthread_exit((void *)args);
847 /* NOTREACHED */
848 }
849
850
851 // wait for alternate buffer to be ready to flush
852 while ( 1 ) {
853 pthread_mutex_lock(&pcapfile->m_pbuff);
854 while ( pcapfile->alternate_size == 0 && !args->done ) {
855 pthread_cond_wait(&pcapfile->c_pbuff, &pcapfile->m_pbuff);
856 }
857 dbg_printf("Flush cycle\n");
858 runs++;
859 // try to flush alternate buffer
860 if ( pcapfile->alternate_size ) {
861 // flush alternate buffer
862 dbg_printf("Flush alternate\n");
863 if ( write(pcapfile->pfd, (void *)pcapfile->alternate_buffer, pcapfile->alternate_size) <= 0 ) {
864 LogError("write() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno) );
865 }
866 pcapfile->alternate_size = 0;
867 }
868
869 // if we are done, try to flsuh main data buffer
870 if ( args->done && pcapfile->data_size ) {
871 dbg_printf("Done: Flush all buffers\n");
872 // flush alternate buffer
873 if ( write(pcapfile->pfd, (void *)pcapfile->data_buffer, pcapfile->data_size) <= 0 ) {
874 LogError("write() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno) );
875 }
876 pcapfile->data_size = 0;
877 pcapfile->data_ptr = pcapfile->data_buffer;
878 }
879
880 // check if we need to rotate/close the file
881 if ( args->done || pcapfile->t_CloseRename ) { /* rotate file */
882 struct tm *when;
883 char FullName[MAXPATHLEN];
884 char pcapFname[128];
885 char error[256];
886 char *subdir, fmt[24];
887 int err;
888
889 dbg_printf("Flush rotate file\n");
890 when = localtime(&pcapfile->t_CloseRename);
891 strftime(fmt, sizeof(fmt), time_extension, when);
892
893 pcapfile->t_CloseRename = 0;
894
895 // prepare sub dir hierarchy
896 if ( args->subdir_index ) {
897 subdir = GetSubDir(when);
898 if ( !subdir ) {
899 // failed to generate subdir path - put flows into base directory
900 LogError("Failed to create subdir path!");
901
902 // failed to generate subdir path - put flows into base directory
903 subdir = NULL;
904 snprintf(pcapFname, 127, "pcapd.%s", fmt);
905 } else {
906 snprintf(pcapFname, 127, "%s/pcapd.%s", subdir, fmt);
907 }
908
909 } else {
910 subdir = NULL;
911 snprintf(pcapFname, 127, "pcapd.%s", fmt);
912 }
913 pcapFname[127] = '\0';
914
915 if ( subdir && !SetupSubDir(pcap_datadir, subdir, error, 255) ) {
916 // in this case the flows get lost! - the rename will fail
917 // but this should not happen anyway, unless i/o problems, inode problems etc.
918 LogError("p_packet_thread() Failed to create sub hier directories: %s", error );
919 }
920
921 // prepare full filename
922 snprintf(FullName, MAXPATHLEN-1, "%s/%s", pcap_datadir, pcapFname);
923 FullName[MAXPATHLEN-1] = '\0';
924
925 ClosePcapFile(pcapfile);
926 err = rename(pcap_dumpfile, FullName);
927 if (err) {
928 LogError("rename() pcap failed in %s line %d: %s", __FILE__, __LINE__, strerror(errno) );
929 }
930 dbg_printf("Rotate file: %s -> %s\n", pcap_dumpfile, FullName);
931
932 if ( args->done ) {
933 pthread_mutex_unlock(&pcapfile->m_pbuff);
934 pthread_cond_signal(&pcapfile->c_pbuff);
935 break;
936 }
937
938 // open new files
939 pcapfile = OpenNewPcapFile(pcap_dev->handle, pcap_dumpfile, pcapfile);
940 if (!pcapfile) {
941 args->done = 1;
942 args->exit = 255;
943 pthread_kill(args->parent, SIGUSR1);
944 pthread_mutex_unlock(&pcapfile->m_pbuff);
945 pthread_cond_signal(&pcapfile->c_pbuff);
946 break;
947 }
948
949 }
950 dbg_printf("Flush cycle done\n");
951 pthread_mutex_unlock(&pcapfile->m_pbuff);
952 pthread_cond_signal(&pcapfile->c_pbuff);
953 }
954
955 dbg_printf("End flush thread[%lu]: %i runs\n", (long unsigned)args->tid, runs);
956 pthread_exit((void *)args);
957 /* NOTREACHED */
958
959 } // End of p_pcap_flush_thread
960
p_packet_thread(void * thread_data)961 __attribute__((noreturn)) static void *p_packet_thread(void *thread_data) {
962 // argument dispatching
963 p_packet_thread_args_t *args = (p_packet_thread_args_t *)thread_data;
964 pcap_dev_t *pcap_dev = args->pcap_dev;
965 time_t t_win = args->t_win;
966 char *pcap_datadir = args->pcap_datadir;
967 char *time_extension = args->time_extension;
968 int subdir_index = args->subdir_index;
969 int live = args->live;
970 // locals
971 p_pcap_flush_thread_args_t p_flush_thread_args;
972 pcapfile_t *pcapfile;
973 time_t t_start;
974 int err;
975
976 dbg_printf("New packet thread[%lu]\n", (long unsigned)args->tid);
977 args->done = 0;
978 args->exit = 0;
979
980 err = pthread_setspecific( buffer_key, (void *)args );
981 if ( err ) {
982 LogError("[%lu] pthread_setspecific() error in %s line %d: %s\n",
983 (long unsigned)args->tid, __FILE__, __LINE__, strerror(errno) );
984 args->done = 1;
985 args->exit = 255;
986 pthread_kill(args->parent, SIGUSR1);
987 pthread_exit((void *)args);
988 /* NOTREACHED */
989 }
990
991 /* start flush and pcap file handler thread */
992 if ( pcap_datadir ) {
993 // just allocate pcapfile and buffers - we need to share pcapfile
994 pcapfile = OpenNewPcapFile(pcap_dev->handle, NULL, NULL);
995 if ( !pcapfile ) {
996 args->done = 1;
997 args->exit = 255;
998 pthread_kill(args->parent, SIGUSR1);
999 pthread_exit((void *)args);
1000 /* NOTREACHED */
1001 }
1002
1003 p_flush_thread_args.done = 0;
1004 p_flush_thread_args.exit = 0;
1005 p_flush_thread_args.parent = args->tid;
1006 p_flush_thread_args.pcap_dev = args->pcap_dev;
1007 p_flush_thread_args.subdir_index = subdir_index;
1008 p_flush_thread_args.pcap_datadir = pcap_datadir;
1009 p_flush_thread_args.time_extension = time_extension;
1010 p_flush_thread_args.pcapfile = pcapfile;
1011
1012 err = pthread_create(&p_flush_thread_args.tid, NULL, p_pcap_flush_thread, (void *)&p_flush_thread_args);
1013 if ( err ) {
1014 LogError("pthread_create() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno) );
1015 args->done = 1;
1016 args->exit = 255;
1017 pthread_kill(args->parent, SIGUSR1);
1018 pthread_exit((void *)args);
1019 }
1020 dbg_printf("Started flush thread[%lu]\n", (long unsigned)p_flush_thread_args.tid);
1021
1022 } else {
1023 pcapfile = NULL;
1024 }
1025
1026 err = 0;
1027 t_start = 0;
1028 while ( 1 ) {
1029 struct pcap_pkthdr *hdr;
1030 const u_char *data;
1031 struct timeval tv;
1032 time_t t_clock;
1033 int ret;
1034
1035 if ( !args->done ) {
1036 ret = pcap_next_ex(pcap_dev->handle, &hdr, &data);
1037 t_clock = 0;
1038 switch (ret) {
1039 case 1: {
1040 // packet read ok
1041 t_clock = hdr->ts.tv_sec;
1042 // process packet for flow cache
1043 ProcessPacket(args->NodeList, pcap_dev, hdr, data);
1044 if ( pcap_datadir ) {
1045 // keep the packet
1046 if (((t_clock - t_start) >= t_win)) {
1047 // first packet or rotate file
1048 if ( t_start != 0 ) {
1049 RotateFile(pcapfile, t_start, live);
1050 }
1051 // if first packet - set t_start here
1052 t_start = t_clock - (t_clock % t_win);
1053 }
1054 PcapDump(pcapfile, hdr, data);
1055 }
1056 } break;
1057 case 0: {
1058 // live capture idle cycle
1059 dbg_printf("pcap_next_ex() read live - timeout\n");
1060 gettimeofday(&tv, NULL);
1061 t_clock = tv.tv_sec;
1062 if ((t_clock - t_start) >= t_win) { /* rotate file */
1063 if ( t_start ) {
1064 // if not first packet, where t_start = 0
1065 struct FlowNode *Node = New_Node();
1066 Node->t_first = tv;
1067 Node->t_last = tv;
1068 Node->fin = SIGNAL_NODE;
1069 Push_Node(args->NodeList, Node);
1070 if ( pcap_datadir ) {
1071 // keep the packet
1072 RotateFile(pcapfile, t_start, live);
1073 }
1074 LogInfo("Packet processing stats: Total: %u, Skipped: %u, Unknown: %u, Short snaplen: %u",
1075 pcap_dev->proc_stat.packets, pcap_dev->proc_stat.skipped,
1076 pcap_dev->proc_stat.unknown, pcap_dev->proc_stat.short_snap);
1077 }
1078 if ( live ) {
1079 struct pcap_stat p_stat;
1080 if( pcap_stats(pcap_dev->handle, &p_stat) < 0) {
1081 LogInfo("pcap_stats() failed: %s", pcap_geterr(pcapfile->p));
1082 } else {
1083 LogInfo("Dropped: %u, dropped by interface: %u ",
1084 p_stat.ps_drop, p_stat.ps_ifdrop );
1085 }
1086 }
1087 t_start = t_clock - (t_clock % t_win);
1088 memset((void *)&(pcap_dev->proc_stat), 0, sizeof(proc_stat_t));
1089 }
1090 } break;
1091 case -1:
1092 // signal error reading the packet
1093 err = 1;
1094 LogError("pcap_next_ex() read error: '%s'", pcap_geterr(pcap_dev->handle));
1095 args->done = 1;
1096 continue;
1097 break;
1098 case -2: // End of packet file
1099 // signal parent, job is done
1100 err = 1;
1101 LogInfo("pcap_next_ex() end of file");
1102 args->done = 1;
1103 LogInfo("Packet processing stats: Total: %u, Skipped: %u, Unknown: %u, Short snaplen: %u",
1104 pcap_dev->proc_stat.packets, pcap_dev->proc_stat.skipped,
1105 pcap_dev->proc_stat.unknown, pcap_dev->proc_stat.short_snap);
1106 continue;
1107 break;
1108 default:
1109 err = 1;
1110 pcap_breakloop(pcap_dev->handle);
1111 LogError("Unexpected pcap_next_ex() return value: %i", ret);
1112 args->done = 1;
1113 continue;
1114 }
1115
1116 }
1117
1118 if ( args->done )
1119 break;
1120
1121 }
1122
1123 if ( pcap_datadir ) {
1124 dbg_printf("Wait for flush thread to complete\n");
1125 pthread_mutex_lock(&pcapfile->m_pbuff);
1126 while ( pcapfile->alternate_size ) {
1127 pthread_cond_wait(&pcapfile->c_pbuff, &pcapfile->m_pbuff);
1128 }
1129 pcapfile->t_CloseRename = t_start;
1130 pthread_mutex_unlock(&pcapfile->m_pbuff);
1131 dbg_printf("Wait done.\n");
1132
1133 LogInfo("Signal flush thread[%lu] to terminate", p_flush_thread_args.tid);
1134 SignalThreadTerminate((thread_info_t *)&p_flush_thread_args, &pcapfile->c_pbuff);
1135 }
1136
1137 if ( err )
1138 pthread_kill(args->parent, SIGUSR1);
1139
1140 LogInfo("Packet processing stats: Total: %u, Skipped: %u, Unknown: %u, Short snaplen: %u",
1141 pcap_dev->proc_stat.packets, pcap_dev->proc_stat.skipped,
1142 pcap_dev->proc_stat.unknown, pcap_dev->proc_stat.short_snap);
1143 LogInfo("Terminating packet dumping: exit: %i", args->exit);
1144 dbg_printf("End packet thread[%lu]\n", (long unsigned)args->tid);
1145
1146 pthread_exit((void *)args);
1147 /* NOTREACHED */
1148
1149 } /* End of p_packet_thread */
1150
WaitDone(void)1151 static void WaitDone(void) {
1152 sigset_t signal_set;
1153 int done, sig;
1154 pthread_t tid = pthread_self();
1155
1156 LogInfo("[%lu] WaitDone() waiting", (long unsigned)tid);
1157
1158 sigemptyset(&signal_set);
1159 sigaddset(&signal_set, SIGINT);
1160 sigaddset(&signal_set, SIGHUP);
1161 sigaddset(&signal_set, SIGTERM);
1162 sigaddset(&signal_set, SIGUSR1);
1163 pthread_sigmask(SIG_BLOCK, &signal_set, NULL);
1164
1165 done = 0;
1166 while ( !done ) {
1167 sigwait(&signal_set, &sig);
1168 LogInfo("[%lu] WaitDone() signal %i", (long unsigned)tid, sig);
1169 switch ( sig ) {
1170 case SIGHUP:
1171 break;
1172 case SIGINT:
1173 case SIGTERM:
1174 pthread_mutex_lock(&m_done);
1175 done = 1;
1176 pthread_mutex_unlock(&m_done);
1177 pthread_cond_signal(&terminate);
1178 break;
1179 case SIGUSR1:
1180 // child signals end of job
1181 done = 1;
1182 break;
1183 // default:
1184 // empty
1185 }
1186 }
1187
1188 } // End of WaitDone
1189
main(int argc,char * argv[])1190 int main(int argc, char *argv[]) {
1191 sigset_t signal_set;
1192 struct sigaction sa;
1193 int c, snaplen, err, do_daemonize;
1194 int subdir_index, compress, expire, cache_size;
1195 int active, inactive;
1196 FlowSource_t *fs;
1197 dirstat_t *dirstat;
1198 time_t t_win;
1199 char *device, *pcapfile, *filter, *datadir, *pcap_datadir, *extension_tags, pidfile[MAXPATHLEN], pidstr[32];
1200 char *Ident, *userid, *groupid;
1201 char *time_extension;
1202 pcap_dev_t *pcap_dev;
1203 p_packet_thread_args_t *p_packet_thread_args;
1204 p_flow_thread_args_t *p_flow_thread_args;
1205
1206 snaplen = 1526;
1207 do_daemonize = 0;
1208 launcher_pid = 0;
1209 device = NULL;
1210 pcapfile = NULL;
1211 filter = NULL;
1212 pidfile[0] = '\0';
1213 t_win = TIME_WINDOW;
1214 datadir = DEFAULT_DIR;
1215 pcap_datadir = NULL;
1216 userid = groupid = NULL;
1217 Ident = "none";
1218 fs = NULL;
1219 extension_tags = DefaultExtensions;
1220 time_extension = "%Y%m%d%H%M";
1221 subdir_index = 0;
1222 compress = NOT_COMPRESSED;
1223 verbose = 0;
1224 expire = 0;
1225 cache_size = 0;
1226 active = 0;
1227 inactive = 0;
1228 while ((c = getopt(argc, argv, "B:DEI:e:g:hi:j:r:s:l:p:P:t:u:S:T:Vyz")) != EOF) {
1229 switch (c) {
1230 struct stat fstat;
1231 case 'h':
1232 usage(argv[0]);
1233 exit(0);
1234 break;
1235 case 'u':
1236 userid = optarg;
1237 break;
1238 case 'g':
1239 groupid = optarg;
1240 break;
1241 case 'D':
1242 do_daemonize = 1;
1243 break;
1244 case 'B':
1245 cache_size = atoi(optarg);
1246 if (cache_size <= 0) {
1247 LogError("ERROR: Cache size must not be < 0");
1248 exit(EXIT_FAILURE);
1249 }
1250 break;
1251 case 'I':
1252 Ident = strdup(optarg);
1253 break;
1254 case 'i':
1255 device = optarg;
1256 break;
1257 case 'l':
1258 datadir = optarg;
1259 err = stat(datadir, &fstat);
1260 if (!(fstat.st_mode & S_IFDIR)) {
1261 LogError("No such directory: " "'%s'", datadir);
1262 break;
1263 }
1264 break;
1265 case 'p':
1266 pcap_datadir = optarg;
1267 err = stat(pcap_datadir, &fstat);
1268 if (!(fstat.st_mode & S_IFDIR)) {
1269 LogError("No such directory: " "'%s'", pcap_datadir);
1270 break;
1271 }
1272 break;
1273 case 'r': {
1274 struct stat stat_buf;
1275 pcapfile = optarg;
1276 if ( stat(pcapfile, &stat_buf) ) {
1277 LogError("Can't stat '%s': %s", pcapfile, strerror(errno));
1278 exit(EXIT_FAILURE);
1279 }
1280 if (!S_ISREG(stat_buf.st_mode) ) {
1281 LogError("'%s' is not a file", pcapfile);
1282 exit(EXIT_FAILURE);
1283 }
1284 } break;
1285 case 's':
1286 snaplen = atoi(optarg);
1287 if (snaplen < 14 + 20 + 20) { // ethernet, IP , TCP, no payload
1288 LogError("ERROR:, snaplen < sizeof IPv4 - Need 54 bytes for TCP/IPv4");
1289 exit(EXIT_FAILURE);
1290 }
1291 break;
1292 case 'e': {
1293 if ( strlen(optarg) > 16 ) {
1294 LogError("ERROR:, size timeout values too big");
1295 exit(EXIT_FAILURE);
1296 }
1297 char *s = strdup(optarg);
1298 char *sep = strchr(s, ',');
1299 if ( !sep ) {
1300 LogError("ERROR:, timeout values format error");
1301 exit(EXIT_FAILURE);
1302 }
1303 *sep = '\0';
1304 sep++;
1305 active = atoi(s);
1306 inactive = atoi(sep);
1307 if (snaplen < 14 + 20 + 20) { // ethernet, IP , TCP, no payload
1308 LogError("ERROR:, snaplen < sizeof IPv4 - Need 54 bytes for TCP/IPv4");
1309 exit(EXIT_FAILURE);
1310 }
1311 } break;
1312 case 't':
1313 t_win = atoi(optarg);
1314 if (t_win < 2) {
1315 LogError("time interval <= 2s not allowed");
1316 exit(EXIT_FAILURE);
1317 }
1318 if (t_win < 60) {
1319 time_extension = "%Y%m%d%H%M%S";
1320 }
1321 break;
1322 case 'j':
1323 if ( compress ) {
1324 LogError("Use either -z for LZO or -j for BZ2 compression, but not both\n");
1325 exit(255);
1326 }
1327 compress = BZ2_COMPRESSED;
1328 break;
1329 case 'y':
1330 if ( compress ) {
1331 LogError("Use one compression: -z for LZO, -j for BZ2 or -y for LZ4 compression\n");
1332 exit(255);
1333 }
1334 compress = LZ4_COMPRESSED;
1335 break;
1336 case 'z':
1337 if ( compress ) {
1338 LogError("Use either -z for LZO or -j for BZ2 compression, but not both\n");
1339 exit(255);
1340 }
1341 compress = LZO_COMPRESSED;
1342 break;
1343 case 'P':
1344 if ( optarg[0] == '/' ) { // absolute path given
1345 strncpy(pidfile, optarg, MAXPATHLEN-1);
1346 } else { // path relative to current working directory
1347 char tmp[MAXPATHLEN];
1348 if ( !getcwd(tmp, MAXPATHLEN-1) ) {
1349 fprintf(stderr, "Failed to get current working directory: %s\n", strerror(errno));
1350 exit(255);
1351 }
1352 tmp[MAXPATHLEN-1] = 0;
1353 if ( (strlen(tmp) + strlen(optarg) + 3) < MAXPATHLEN ) {
1354 snprintf(pidfile, MAXPATHLEN - 3 - strlen(tmp), "%s/%s", tmp, optarg);
1355 } else {
1356 fprintf(stderr, "pidfile MAXPATHLEN error:\n");
1357 exit(255);
1358 }
1359 }
1360 // pidfile now absolute path
1361 pidfile[MAXPATHLEN-1] = 0;
1362 break;
1363 case 'S':
1364 subdir_index = atoi(optarg);
1365 break;
1366 case 'T': {
1367 size_t len = strlen(optarg);
1368 extension_tags = optarg;
1369 if ( len == 0 || len > 128 ) {
1370 fprintf(stderr, "Extension length error. Unexpected option '%s'\n", extension_tags);
1371 exit(255);
1372 }
1373 break; }
1374 case 'E':
1375 verbose = 1;
1376 Setv6Mode(1);
1377 break;
1378 case 'V':
1379 printf("%s: Version: %s\n",argv[0], nfdump_version);
1380 exit(0);
1381 break;
1382 default:
1383 usage(argv[0]);
1384 exit(EXIT_FAILURE);
1385 }
1386 }
1387
1388 if (argc - optind > 1) {
1389 usage(argv[0]);
1390 exit(EXIT_FAILURE);
1391 } else {
1392 /* user specified a pcap filter */
1393 filter = argv[optind];
1394 }
1395
1396 if ( fs == NULL && datadir != NULL && !AddDefaultFlowSource(&fs, Ident, datadir) ) {
1397 fprintf(stderr, "Failed to add default data collector directory\n");
1398 exit(255);
1399 }
1400
1401 if ( device && pcapfile ) {
1402 LogError("Specify either a device or a pcapfile, but not both");
1403 exit(EXIT_FAILURE);
1404 }
1405
1406 if ( !device && !pcapfile ) {
1407 LogError("Specify either a device or a pcapfile to read packets from");
1408 exit(EXIT_FAILURE);
1409 }
1410
1411 if ( !Init_FlowTree(cache_size, active, inactive)) {
1412 LogError("Init_FlowTree() failed.");
1413 exit(EXIT_FAILURE);
1414 }
1415
1416 InitExtensionMaps(NO_EXTENSION_LIST);
1417 SetupExtensionDescriptors(strdup(extension_tags));
1418
1419 if ( pcapfile ) {
1420 pcap_dev = setup_pcap_file(pcapfile, filter, snaplen);
1421 } else {
1422 pcap_dev = setup_pcap_live(device, filter, snaplen);
1423 }
1424 if (!pcap_dev) {
1425 exit(EXIT_FAILURE);
1426 }
1427
1428 SetPriv(userid, groupid);
1429
1430 if ( subdir_index && !InitHierPath(subdir_index) ) {
1431 pcap_close(pcap_dev->handle);
1432 exit(255);
1433 }
1434
1435 if ( !InitLog(do_daemonize, argv[0], SYSLOG_FACILITY, verbose) ) {
1436 pcap_close(pcap_dev->handle);
1437 exit(255);
1438 }
1439
1440 // check if pid file exists and if so, if a process with registered pid is running
1441 if ( strlen(pidfile) ) {
1442 int pidf;
1443 pidf = open(pidfile, O_RDONLY, 0);
1444 if ( pidf > 0 ) {
1445 // pid file exists
1446 char s[32];
1447 ssize_t len;
1448 len = read(pidf, (void *)s, 31);
1449 close(pidf);
1450 s[31] = '\0';
1451 if ( len < 0 ) {
1452 fprintf(stderr, "read() error existing pid file: %s\n", strerror(errno));
1453 pcap_close(pcap_dev->handle);
1454 exit(255);
1455 } else {
1456 unsigned long pid = atol(s);
1457 if ( pid == 0 ) {
1458 // garbage - use this file
1459 unlink(pidfile);
1460 } else {
1461 if ( kill(pid, 0) == 0 ) {
1462 // process exists
1463 fprintf(stderr, "A process with pid %lu registered in pidfile %s is already running!\n",
1464 pid, strerror(errno));
1465 pcap_close(pcap_dev->handle);
1466 exit(255);
1467 } else {
1468 // no such process - use this file
1469 unlink(pidfile);
1470 }
1471 }
1472 }
1473 } else {
1474 if ( errno != ENOENT ) {
1475 fprintf(stderr, "open() error existing pid file: %s\n", strerror(errno));
1476 pcap_close(pcap_dev->handle);
1477 exit(255);
1478 } // else errno == ENOENT - no file - this is fine
1479 }
1480 }
1481
1482 if ( do_daemonize ) {
1483 verbose = 0;
1484 daemonize();
1485 }
1486
1487 if (strlen(pidfile)) {
1488 pid_t pid = getpid();
1489 int pidf = open(pidfile, O_RDWR|O_TRUNC|O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
1490 if ( pidf == -1 ) {
1491 LogError("Error opening pid file: '%s' %s", pidfile, strerror(errno));
1492 pcap_close(pcap_dev->handle);
1493 exit(255);
1494 }
1495 snprintf(pidstr,31,"%lu\n", (unsigned long)pid);
1496 if ( write(pidf, pidstr, strlen(pidstr)) <= 0 ) {
1497 LogError("Error write pid file: '%s' %s", pidfile, strerror(errno));
1498 }
1499 close(pidf);
1500 }
1501
1502 if ( InitBookkeeper(&fs->bookkeeper, fs->datadir, getpid(), launcher_pid) != BOOKKEEPER_OK ) {
1503 LogError("initialize bookkeeper failed.");
1504 pcap_close(pcap_dev->handle);
1505 exit(255);
1506 }
1507
1508 // Init the extension map list
1509 if ( !InitExtensionMapList(fs) ) {
1510 pcap_close(pcap_dev->handle);
1511 exit(255);
1512 }
1513
1514 IPFragTree_init();
1515
1516 LogInfo("Startup.");
1517 // prepare signal mask for all threads
1518 // block signals, as they are handled by the main thread
1519 // mask is inherited by all threads
1520 sigemptyset(&signal_set);
1521 sigaddset(&signal_set, SIGINT);
1522 sigaddset(&signal_set, SIGHUP);
1523 sigaddset(&signal_set, SIGTERM);
1524 sigaddset(&signal_set, SIGUSR1);
1525 sigaddset(&signal_set, SIGPIPE);
1526 pthread_sigmask(SIG_BLOCK, &signal_set, NULL);
1527
1528 // for USR2 set a signal handler, which interrupts blocking
1529 // system calls - and signals done event
1530 // handler applies for all threads in a process
1531 sa.sa_handler = Interrupt_handler;
1532 sigemptyset(&sa.sa_mask);
1533 sa.sa_flags = 0;
1534 sigaction(SIGPIPE, &sa, NULL);
1535 sigaction(SIGUSR2, &sa, NULL);
1536
1537 // key for each thread
1538 err = pthread_key_create(&buffer_key, NULL);
1539 if ( err ) {
1540 LogError("pthread_key() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno) );
1541 exit(255);
1542 }
1543
1544 // prepare flow thread args
1545 p_flow_thread_args = (p_flow_thread_args_t *)malloc(sizeof(p_flow_thread_args_t));
1546 if ( !p_flow_thread_args ) {
1547 LogError("malloc() error in %s line %d: %s\n",
1548 __FILE__, __LINE__, strerror(errno) );
1549 exit(255);
1550 }
1551 p_flow_thread_args->fs = fs;
1552 p_flow_thread_args->t_win = t_win;
1553 p_flow_thread_args->compress = compress;
1554 p_flow_thread_args->live = device != NULL;
1555 p_flow_thread_args->subdir_index = subdir_index;
1556 p_flow_thread_args->parent = pthread_self();
1557 p_flow_thread_args->NodeList = NewNodeList();
1558 p_flow_thread_args->time_extension = time_extension;
1559
1560 err = 0;
1561
1562 err = pthread_create(&p_flow_thread_args->tid, NULL, p_flow_thread, (void *)p_flow_thread_args);
1563 if ( err ) {
1564 LogError("pthread_create() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno) );
1565 exit(255);
1566 }
1567 dbg_printf("Started flow thread[%lu]\n", (long unsigned)p_flow_thread_args->tid);
1568
1569 // prepare packet thread args
1570 p_packet_thread_args = (p_packet_thread_args_t *)malloc(sizeof(p_packet_thread_args_t));
1571 if ( !p_packet_thread_args ) {
1572 LogError("malloc() error in %s line %d: %s\n",
1573 __FILE__, __LINE__, strerror(errno) );
1574 exit(255);
1575 }
1576 p_packet_thread_args->pcap_dev = pcap_dev;
1577 p_packet_thread_args->t_win = t_win;
1578 p_packet_thread_args->subdir_index = subdir_index;
1579 p_packet_thread_args->pcap_datadir = pcap_datadir;
1580 p_packet_thread_args->live = device != NULL;
1581 p_packet_thread_args->parent = pthread_self();
1582 p_packet_thread_args->NodeList = p_flow_thread_args->NodeList;
1583 p_packet_thread_args->time_extension = p_flow_thread_args->time_extension;
1584
1585 err = pthread_create(&p_packet_thread_args->tid, NULL, p_packet_thread, (void *)p_packet_thread_args);
1586 if ( err ) {
1587 LogError("pthread_create() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno) );
1588 exit(255);
1589 }
1590 dbg_printf("Started packet thread[%lu]\n", (long unsigned)p_packet_thread_args->tid);
1591
1592 // Wait till done
1593 WaitDone();
1594
1595 dbg_printf("Signal packet thread to terminate\n");
1596 SignalThreadTerminate((thread_info_t *)p_packet_thread_args, NULL);
1597
1598 dbg_printf("Signal flow thread to terminate\n");
1599 SignalThreadTerminate((thread_info_t *)p_flow_thread_args, &p_packet_thread_args->NodeList->c_list);
1600
1601 IPFragTree_free();
1602
1603 // free arg list
1604 free((void *)p_packet_thread_args);
1605 free((void *)p_flow_thread_args);
1606
1607 LogInfo("Terminating nfpcapd.");
1608
1609 if ( expire == 0 && ReadStatInfo(fs->datadir, &dirstat, LOCK_IF_EXISTS) == STATFILE_OK ) {
1610 UpdateBookStat(dirstat, fs->bookkeeper);
1611 WriteStatInfo(dirstat);
1612 LogInfo("Updating statinfo in directory '%s'", datadir);
1613 }
1614
1615 ReleaseBookkeeper(fs->bookkeeper, DESTROY_BOOKKEEPER);
1616 pcap_close(pcap_dev->handle);
1617
1618 if ( strlen(pidfile) )
1619 unlink(pidfile);
1620
1621 EndLog();
1622
1623 exit(EXIT_SUCCESS);
1624 } /* End of main */
1625
1626