1 /*
2  *  Copyright (c) 2009-2020, Peter Haag
3  *  Copyright (c) 2004-2008, SWITCH - Teleinformatikdienste fuer Lehre und Forschung
4  *  All rights reserved.
5  *
6  *  Redistribution and use in source and binary forms, with or without
7  *  modification, are permitted provided that the following conditions are met:
8  *
9  *   * Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *   * Redistributions in binary form must reproduce the above copyright notice,
12  *     this list of conditions and the following disclaimer in the documentation
13  *     and/or other materials provided with the distribution.
14  *   * Neither the name of the author nor the names of its contributors may be
15  *     used to endorse or promote products derived from this software without
16  *     specific prior written permission.
17  *
18  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  *  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  *  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  *  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22  *  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  *  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  *  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  *  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  *  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  *  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  *  POSSIBILITY OF SUCH DAMAGE.
29  *
30  */
31 
32 #include "config.h"
33 
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <stdarg.h>
37 #include <string.h>
38 #include <errno.h>
39 #include <sys/types.h>
40 #include <sys/socket.h>
41 #include <time.h>
42 #include <sys/time.h>
43 #include <netdb.h>
44 #include <pwd.h>
45 #include <grp.h>
46 #include <unistd.h>
47 #include <sys/wait.h>
48 #include <sys/stat.h>
49 #include <sys/param.h>
50 #include <netinet/in.h>
51 #include <arpa/inet.h>
52 #include <fcntl.h>
53 #include <signal.h>
54 #include <sys/mman.h>
55 #include <string.h>
56 #include <dirent.h>
57 
58 #ifdef PCAP
59 #include "pcap_reader.h"
60 #endif
61 
62 #ifdef HAVE_STDINT_H
63 #include <stdint.h>
64 #endif
65 
66 #include "util.h"
67 #include "nfdump.h"
68 #include "nffile.h"
69 #include "nfx.h"
70 #include "exporter.h"
71 #include "nfnet.h"
72 #include "flist.h"
73 #include "nfstatfile.h"
74 #include "bookkeeper.h"
75 #include "launch.h"
76 #include "collector.h"
77 #include "netflow_v1.h"
78 #include "netflow_v5_v7.h"
79 #include "netflow_v9.h"
80 #include "ipfix.h"
81 
82 #ifdef HAVE_FTS_H
83 #   include <fts.h>
84 #else
85 #   include "fts_compat.h"
86 #define fts_children fts_children_compat
87 #define fts_close fts_close_compat
88 #define fts_open  fts_open_compat
89 #define fts_read  fts_read_compat
90 #define fts_set   fts_set_compat
91 #endif
92 
93 #include "expire.h"
94 
95 #define DEFAULTCISCOPORT "9995"
96 #define DEFAULTHOSTNAME "127.0.0.1"
97 #define SENDSOCK_BUFFSIZE 200000
98 
99 static void *shmem = NULL;
100 static int verbose = 0;
101 static uint32_t default_sampling   = 1;
102 static uint32_t overwrite_sampling = 0;
103 
104 extern uint32_t default_sampling;   // the default sampling rate when nothing else applies. set by -S
105 extern uint32_t overwrite_sampling;	// unconditionally overwrite sampling rate with given sampling rate -S
106 
107 // Define a generic type to get data from socket or pcap file
108 typedef ssize_t (*packet_function_t)(int, void *, size_t, int, struct sockaddr *, socklen_t *);
109 
110 /* module limited globals */
111 static FlowSource_t *FlowSource;
112 
113 static int done, launcher_alive, periodic_trigger, launcher_pid;
114 
115 static const char *nfdump_version = VERSION;
116 
117 
118 /* Local function Prototypes */
119 static void usage(char *name);
120 
121 static void kill_launcher(int pid);
122 
123 static void IntHandler(int signal);
124 
125 static inline FlowSource_t *GetFlowSource(struct sockaddr_storage *ss);
126 
127 static void daemonize(void);
128 
129 static void SetPriv(char *userid, char *groupid );
130 
131 static void run(packet_function_t receive_packet, int socket, repeater_t *repeater,
132 	time_t twin, time_t t_begin, int report_seq, int use_subdirs, char *time_extension, int compress);
133 
134 /* Functions */
usage(char * name)135 static void usage(char *name) {
136 		printf("usage %s [options] \n"
137 					"-h\t\tthis text you see right here\n"
138 					"-u userid\tChange user to username\n"
139 					"-g groupid\tChange group to groupname\n"
140 					"-w\t\tSync file rotation with next 5min (default) interval\n"
141 					"-t interval\tset the interval to rotate nfcapd files\n"
142 					"-b host\t\tbind socket to host/IP addr\n"
143 					"-J mcastgroup\tJoin multicast group <mcastgroup>\n"
144 					"-p portnum\tlisten on port portnum\n"
145 					"-l basdir \tset the output directory. (no default) \n"
146 					"-S subdir\tSub directory format. see nfcapd(1) for format\n"
147 					"-I Ident\tset the ident string for stat file. (default 'none')\n"
148 					"-H Add port histogram data to flow file.(default 'no')\n"
149 					"-n Ident,IP,logdir\tAdd this flow source - multiple streams\n"
150 					"-M dir \t\tSet the output directory for dynamic sources.\n"
151 
152 					"-P pidfile\tset the PID file\n"
153 					"-R IP[/port]\tRepeat incoming packets to IP address/port. Max 8 repeaters.\n"
154 					"-s rate\tset default sampling rate (default 1)\n"
155 					"-x process\tlaunch process after a new file becomes available\n"
156 					"-z\t\tLZO compress flows in output file.\n"
157 					"-y\t\tLZ4 compress flows in output file.\n"
158 					"-j\t\tBZ2 compress flows in output file.\n"
159 					"-B bufflen\tSet socket buffer to bufflen bytes\n"
160 					"-e\t\tExpire data at each cycle.\n"
161 					"-D\t\tFork to background\n"
162 					"-E\t\tPrint extended format of netflow data. For debugging purpose only.\n"
163 					"-T\t\tInclude extension tags in records.\n"
164 					"-4\t\tListen on IPv4 (default).\n"
165 					"-6\t\tListen on IPv6.\n"
166 					"-V\t\tPrint version and exit.\n"
167 					"-Z\t\tAdd timezone offset to filename.\n"
168 					, name);
169 } // End of usage
170 
kill_launcher(int pid)171 void kill_launcher(int pid) {
172 int stat, i;
173 pid_t ret;
174 
175 	if ( pid == 0 )
176 		return;
177 
178 	if ( launcher_alive ) {
179 		LogInfo("Signal launcher[%i] to terminate.", pid);
180 		kill(pid, SIGTERM);
181 
182 		// wait for launcher to teminate
183 		for ( i=0; i<LAUNCHER_TIMEOUT; i++ ) {
184 			if ( !launcher_alive )
185 				break;
186 			sleep(1);
187 		}
188 		if ( i >= LAUNCHER_TIMEOUT ) {
189 			LogError("Launcher does not want to terminate - signal again");
190 			kill(pid, SIGTERM);
191 			sleep(1);
192 		}
193 	} else {
194 		LogError("launcher[%i] already dead.", pid);
195 	}
196 
197 	if ( (ret = waitpid (pid, &stat, 0)) == -1 ) {
198 		LogError("wait for launcher failed: %s %i", strerror(errno), ret);
199 	} else {
200 		if ( WIFEXITED(stat) ) {
201 			LogInfo("launcher exit status: %i", WEXITSTATUS(stat));
202 		}
203 		if (  WIFSIGNALED(stat) ) {
204 			LogError("launcher terminated due to signal %i", WTERMSIG(stat));
205 		}
206 	}
207 
208 } // End of kill_launcher
209 
IntHandler(int signal)210 static void IntHandler(int signal) {
211 
212 	switch (signal) {
213 		case SIGALRM:
214 			periodic_trigger = 1;
215 			break;
216 		case SIGHUP:
217 		case SIGINT:
218 		case SIGTERM:
219 			done = 1;
220 			break;
221 		case SIGCHLD:
222 			launcher_alive = 0;
223 			break;
224 		default:
225 			// ignore everything we don't know
226 			break;
227 	}
228 
229 } /* End of IntHandler */
230 
231 
daemonize(void)232 static void daemonize(void) {
233 int fd;
234 	switch (fork()) {
235 		case 0:
236 			// child
237 			break;
238 		case -1:
239 			// error
240 			fprintf(stderr, "fork() error: %s\n", strerror(errno));
241 			exit(0);
242 			break;
243 		default:
244 			// parent
245 			_exit(0);
246 	}
247 
248 	if (setsid() < 0) {
249 		fprintf(stderr, "setsid() error: %s\n", strerror(errno));
250 		exit(0);
251 	}
252 
253 	// Double fork
254 	switch (fork()) {
255 		case 0:
256 			// child
257 			break;
258 		case -1:
259 			// error
260 			fprintf(stderr, "fork() error: %s\n", strerror(errno));
261 			exit(0);
262 			break;
263 		default:
264 			// parent
265 			_exit(0);
266 	}
267 
268 	fd = open("/dev/null", O_RDONLY);
269 	if (fd != 0) {
270 		dup2(fd, 0);
271 		close(fd);
272 	}
273 	fd = open("/dev/null", O_WRONLY);
274 	if (fd != 1) {
275 		dup2(fd, 1);
276 		close(fd);
277 	}
278 	fd = open("/dev/null", O_WRONLY);
279 	if (fd != 2) {
280 		dup2(fd, 2);
281 		close(fd);
282 	}
283 
284 } // End of daemonize
285 
SetPriv(char * userid,char * groupid)286 static void SetPriv(char *userid, char *groupid ) {
287 struct 	passwd *pw_entry;
288 struct 	group *gr_entry;
289 uid_t	myuid, newuid, newgid;
290 int		err;
291 
292 	if ( userid == 0 && groupid == 0 )
293 		return;
294 
295 	newuid = newgid = 0;
296 	myuid = getuid();
297 	if ( myuid != 0 ) {
298 		LogError("Only root wants to change uid/gid");
299 		fprintf(stderr, "ERROR: Only root wants to change uid/gid\n");
300 		exit(255);
301 	}
302 
303 	if ( userid ) {
304 		pw_entry = getpwnam(userid);
305 		newuid = pw_entry ? pw_entry->pw_uid : atol(userid);
306 
307 		if ( newuid == 0 ) {
308 			fprintf (stderr,"Invalid user '%s'\n", userid);
309 			exit(255);
310 		}
311 	}
312 
313 	if ( groupid ) {
314 		gr_entry = getgrnam(groupid);
315 		newgid = gr_entry ? gr_entry->gr_gid : atol(groupid);
316 
317 		if ( newgid == 0 ) {
318 			fprintf (stderr,"Invalid group '%s'\n", groupid);
319 			exit(255);
320 		}
321 
322 		err = setgid(newgid);
323 		if ( err ) {
324 			LogError("Can't set group id %ld for group '%s': %s",   (long)newgid, groupid, strerror(errno));
325 			fprintf (stderr,"Can't set group id %ld for group '%s': %s\n", (long)newgid, groupid, strerror(errno));
326 			exit(255);
327 		}
328 
329 	}
330 
331 	if ( newuid ) {
332 		err = setuid(newuid);
333 		if ( err ) {
334 			LogError("Can't set user id %ld for user '%s': %s",   (long)newuid, userid, strerror(errno));
335 			fprintf (stderr,"Can't set user id %ld for user '%s': %s\n", (long)newuid, userid, strerror(errno));
336 			exit(255);
337 		}
338 	}
339 
340 } // End of SetPriv
341 
format_file_block_header(data_block_header_t * header)342 static void format_file_block_header(data_block_header_t *header) {
343 
344 	printf("\n"
345 "File Block Header: \n"
346 "  NumBlocks     =  %10u\n"
347 "  Size          =  %10u\n"
348 "  id         	 =  %10u\n",
349 		header->NumRecords,
350 		header->size,
351 		header->id);
352 
353 } // End of format_file_block_header
354 
355 #include "nffile_inline.c"
356 #include "collector_inline.c"
357 
run(packet_function_t receive_packet,int socket,repeater_t * repeater,time_t twin,time_t t_begin,int report_seq,int use_subdirs,char * time_extension,int compress)358 static void run(packet_function_t receive_packet, int socket, repeater_t *repeater,
359 	time_t twin, time_t t_begin, int report_seq, int use_subdirs, char *time_extension, int compress) {
360 common_flow_header_t	*nf_header;
361 FlowSource_t			*fs;
362 struct sockaddr_storage nf_sender;
363 socklen_t 	nf_sender_size = sizeof(nf_sender);
364 time_t 		t_start, t_now;
365 uint64_t	export_packets;
366 uint32_t	blast_cnt, blast_failures, ignored_packets;
367 uint16_t	version;
368 ssize_t		cnt;
369 void 		*in_buff;
370 int 		err;
371 srecord_t	*commbuff;
372 
373 	if ( !Init_v1(verbose) || !Init_v5_v7_input(verbose, default_sampling, overwrite_sampling) ||
374 		 !Init_v9(verbose, default_sampling, overwrite_sampling) || !Init_IPFIX(verbose, default_sampling, overwrite_sampling) )
375 		return;
376 
377 	in_buff  = malloc(NETWORK_INPUT_BUFF_SIZE);
378 	if ( !in_buff ) {
379 		LogError("malloc() allocation error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno) );
380 		return;
381 	}
382 
383 	// init vars
384 	commbuff = (srecord_t *)shmem;
385 	nf_header = (common_flow_header_t *)in_buff;
386 
387 	// Init each netflow source output data buffer
388 	fs = FlowSource;
389 	while ( fs ) {
390 
391 		// prepare file
392 		fs->nffile = OpenNewFile(fs->current, NULL, compress, 0, NULL);
393 		if ( !fs->nffile ) {
394 			return;
395 		}
396 		// init vars
397 		fs->bad_packets		= 0;
398 		fs->first_seen      = 0xffffffffffffLL;
399 		fs->last_seen 		= 0;
400 
401 		// next source
402 		fs = fs->next;
403 	}
404 
405 	export_packets = blast_cnt = blast_failures = 0;
406 	t_start = t_begin;
407 
408 	cnt = 0;
409 	periodic_trigger = 0;
410 	ignored_packets  = 0;
411 
412 	// wake up at least at next time slot (twin) + 1s
413 	alarm(t_start + twin + 1 - time(NULL));
414 	/*
415 	 * Main processing loop:
416 	 * this loop, continues until done = 1, set by the signal handler
417 	 * The while loop will be breaked by the periodic file renaming code
418 	 * for proper cleanup
419 	 */
420 	while ( 1 ) {
421 		struct timeval tv;
422 		int i;
423 
424 		/* read next bunch of data into beginn of input buffer */
425 		if ( !done) {
426 #ifdef PCAP
427 			// Debug code to read from pcap file, or from socket
428 			cnt = receive_packet(socket, in_buff, NETWORK_INPUT_BUFF_SIZE , 0,
429 						(struct sockaddr *)&nf_sender, &nf_sender_size);
430 
431 			// in case of reading from file EOF => -2
432 			if ( cnt == -2 )
433 				done = 1;
434 #else
435 			cnt = recvfrom (socket, in_buff, NETWORK_INPUT_BUFF_SIZE , 0,
436 						(struct sockaddr *)&nf_sender, &nf_sender_size);
437 #endif
438 
439 			if ( cnt == -1 && errno != EINTR ) {
440 				LogError("ERROR: recvfrom: %s", strerror(errno));
441 				continue;
442 			}
443 
444 			i = 0;
445 			while ( repeater[i].hostname && (i < MAX_REPEATERS)) {
446 				ssize_t len;
447 				len = sendto(repeater[i].sockfd, in_buff, cnt, 0,
448 						(struct sockaddr *)&(repeater[i].addr), repeater[i].addrlen);
449 				if ( len < 0 ) {
450 					LogError("ERROR: sendto(): %s", strerror(errno));
451 				}
452 				i++;
453 			}
454 		}
455 
456 		/* Periodic file renaming, if time limit reached or if we are done.  */
457 		// t_now = time(NULL);
458 		gettimeofday(&tv, NULL);
459 		t_now = tv.tv_sec;
460 
461 		if ( ((t_now - t_start) >= twin) || done ) {
462 			struct  tm *now;
463 			char	*subdir, fmt[MAXTIMESTRING];
464 
465 			alarm(0);
466 			now = localtime(&t_start);
467 			strftime(fmt, sizeof(fmt), time_extension, now);
468 
469 			// prepare sub dir hierarchy
470 			if ( use_subdirs ) {
471 				subdir = GetSubDir(now);
472 				if ( !subdir ) {
473 					// failed to generate subdir path - put flows into base directory
474 					LogError("Failed to create subdir path!");
475 
476 					// failed to generate subdir path - put flows into base directory
477 					subdir = NULL;
478 				}
479 			} else {
480 				subdir = NULL;
481 			}
482 
483 			// for each flow source update the stats, close the file and re-initialize the new file
484 			fs = FlowSource;
485 			while ( fs ) {
486 				char nfcapd_filename[MAXPATHLEN];
487 				char error[255];
488 				nffile_t *nffile = fs->nffile;
489 
490 				if ( verbose ) {
491 					// Dump to stdout
492 					format_file_block_header(nffile->block_header);
493 				}
494 
495 				if ( nffile->block_header->NumRecords ) {
496 					// flush current buffer to disc
497 					if ( WriteBlock(nffile) <= 0 )
498 						LogError("Ident: %s, failed to write output buffer to disk: '%s'" ,
499 							fs->Ident, strerror(errno));
500 				} // else - no new records in current block
501 
502 
503 				// prepare filename
504 				if ( subdir ) {
505 					if ( SetupSubDir(fs->datadir, subdir, error, 255) ) {
506 						snprintf(nfcapd_filename, MAXPATHLEN-1, "%s/%s/nfcapd.%s", fs->datadir, subdir, fmt);
507 					} else {
508 						LogError("Ident: %s, Failed to create sub hier directories: %s", fs->Ident, error );
509 						// skip subdir - put flows directly into current directory
510 						snprintf(nfcapd_filename, MAXPATHLEN-1, "%s/nfcapd.%s", fs->datadir, fmt);
511 					}
512 				} else {
513 					snprintf(nfcapd_filename, MAXPATHLEN-1, "%s/nfcapd.%s", fs->datadir, fmt);
514 				}
515 				nfcapd_filename[MAXPATHLEN-1] = '\0';
516 
517 				// update stat record
518 				// if no flows were collected, fs->last_seen is still 0
519 				// set first_seen to start of this time slot, with twin window size.
520 				if ( fs->last_seen == 0 ) {
521 					fs->first_seen = (uint64_t)1000 * (uint64_t)t_start;
522 					fs->last_seen  = (uint64_t)1000 * (uint64_t)(t_start + twin);
523 				}
524 				nffile->stat_record->first_seen = fs->first_seen/1000;
525 				nffile->stat_record->msec_first	= fs->first_seen - nffile->stat_record->first_seen*1000;
526 				nffile->stat_record->last_seen 	= fs->last_seen/1000;
527 				nffile->stat_record->msec_last	= fs->last_seen - nffile->stat_record->last_seen*1000;
528 
529 				// Flush Exporter Stat to file
530 				FlushExporterStats(fs);
531 				// Close file
532 				CloseUpdateFile(nffile, fs->Ident);
533 
534 				// if rename fails, we are in big trouble, as we need to get rid of the old .current file
535 				// otherwise, we will loose flows and can not continue collecting new flows
536 				err = rename(fs->current, nfcapd_filename);
537 				if ( err ) {
538 					LogError("Ident: %s, Can't rename dump file: %s", fs->Ident,  strerror(errno));
539 					LogError("Ident: %s, Serious Problem! Fix manually", fs->Ident);
540 					if ( launcher_pid )
541 						commbuff->failed = 1;
542 
543 					// we do not update the books here, as the file failed to rename properly
544 					// otherwise the books may be wrong
545 				} else {
546 					struct stat	fstat;
547 					if ( launcher_pid )
548 						commbuff->failed = 0;
549 
550 					// Update books
551 					stat(nfcapd_filename, &fstat);
552 					UpdateBooks(fs->bookkeeper, t_start, 512*fstat.st_blocks);
553 				}
554 
555 				// log stats
556 				LogInfo("Ident: '%s' Flows: %llu, Packets: %llu, Bytes: %llu, Sequence Errors: %u, Bad Packets: %u",
557 					fs->Ident, (unsigned long long)nffile->stat_record->numflows,
558 					(unsigned long long)nffile->stat_record->numpackets,
559 					(unsigned long long)nffile->stat_record->numbytes, nffile->stat_record->sequence_failure, fs->bad_packets);
560 
561 				// reset stats
562 				fs->bad_packets = 0;
563 				fs->first_seen  = 0xffffffffffffLL;
564 				fs->last_seen 	= 0;
565 
566 				if ( !done ) {
567 					nffile = OpenNewFile(fs->current, nffile, compress, 0, NULL);
568 					if ( !nffile ) {
569 						LogError("killed due to fatal error: ident: %s", fs->Ident);
570 						break;
571 					}
572 				}
573 
574 				// Dump all extension maps and exporters to the buffer
575 				FlushStdRecords(fs);
576 
577 				// next flow source
578 				fs = fs->next;
579 
580 			} // end of while (fs)
581 
582 			// trigger launcher if required
583 			if ( launcher_pid ) {
584 				// Signal launcher
585 
586 				strncpy(commbuff->tstring, fmt, MAXTIMESTRING);
587 				commbuff->tstring[MAXTIMESTRING-1] = '\0';
588 
589 				commbuff->tstamp = t_start;
590 				if ( subdir ) {
591 					snprintf(commbuff->fname, MAXPATHLEN-1, "%s/nfcapd.%s", subdir, fmt);
592 				} else {
593 					snprintf(commbuff->fname, MAXPATHLEN-1, "nfcapd.%s", fmt);
594 				}
595 				commbuff->fname[MAXPATHLEN-1] = '\0';
596 
597 				if ( launcher_alive ) {
598 					LogInfo("Signal launcher");
599 					kill(launcher_pid, SIGHUP);
600 				} else
601 					LogError("ERROR: Launcher died unexpectedly!");
602 
603 			}
604 
605 			LogInfo("Total ignored packets: %u", ignored_packets);
606 			ignored_packets = 0;
607 
608 			if ( done )
609 				break;
610 
611 			// update alarm for next cycle
612 			t_start += twin;
613 			/* t_start = filename time stamp: begin of slot
614 		 	* + twin = end of next time interval
615 		 	* + 1 = act at least 1s after time window expired
616 		 	* - t_now = difference value to now
617 		 	*/
618 			alarm(t_start + twin + 1 - t_now);
619 
620 		}
621 
622 		/* check for error condition or done . errno may only be EINTR */
623 		if ( cnt < 0 ) {
624 			if ( periodic_trigger ) {
625 				// alarm triggered, no new flow data
626 				periodic_trigger = 0;
627 				continue;
628 			}
629 			if ( done )
630 				// signaled to terminate - exit from loop
631 				break;
632 			else {
633 				/* this should never be executed as it should be caught in other places */
634 				LogError("error condition in '%s', line '%d', cnt: %i", __FILE__, __LINE__ ,(int)cnt);
635 				continue;
636 			}
637 		}
638 
639 		/* enough data? */
640 		if ( cnt == 0 )
641 			continue;
642 
643 		// get flow source record for current packet, identified by sender IP address
644 		fs = GetFlowSource(&nf_sender);
645 		if ( fs == NULL ) {
646 			fs = AddDynamicSource(&FlowSource, &nf_sender);
647 			if ( fs == NULL ) {
648 				LogError("Skip UDP packet. Ignored packets so far %u packets", ignored_packets);
649 				ignored_packets++;
650 				continue;
651 			}
652 			if ( InitBookkeeper(&fs->bookkeeper, fs->datadir, getpid(), launcher_pid) != BOOKKEEPER_OK ) {
653 				LogError("Failed to initialise bookkeeper for new source");
654 				// fatal error
655 				return;
656 			}
657 			fs->nffile = OpenNewFile(fs->current, NULL, compress, 0, NULL);
658 			if ( !fs->nffile ) {
659 				LogError("Failed to open new collector file");
660 				return;
661 			}
662 		}
663 
664 		/* check for too little data - cnt must be > 0 at this point */
665 		if ( cnt < sizeof(common_flow_header_t) ) {
666 			LogError("Ident: %s, Data length error: too little data for common netflow header. cnt: %i",fs->Ident, (int)cnt);
667 			fs->bad_packets++;
668 			continue;
669 		}
670 
671 		fs->received = tv;
672 		/* Process data - have a look at the common header */
673 		version = ntohs(nf_header->version);
674 		switch (version) {
675 			case 1:
676 				Process_v1(in_buff, cnt, fs);
677 				break;
678 			case 5: // fall through
679 			case 7:
680 				Process_v5_v7(in_buff, cnt, fs);
681 				break;
682 			case 9:
683 				Process_v9(in_buff, cnt, fs);
684 				break;
685 			case 10:
686 				Process_IPFIX(in_buff, cnt, fs);
687 				break;
688 			case 255:
689 				// blast test header
690 				if ( verbose ) {
691 					uint16_t count = ntohs(nf_header->count);
692 					if ( blast_cnt != count ) {
693 							// LogError("Mismatch blast check: Expected %u got %u\n", blast_cnt, count);
694 						blast_cnt = count;
695 						blast_failures++;
696 					} else {
697 						blast_cnt++;
698 					}
699 					if ( blast_cnt == 65535 ) {
700 						fprintf(stderr, "Total missed packets: %u\n", blast_failures);
701 						done = 1;
702 					}
703 					break;
704 				}
705 			default:
706 				// data error, while reading data from socket
707 				LogError("Ident: %s, Error reading netflow header: Unexpected netflow version %i", fs->Ident, version);
708 				fs->bad_packets++;
709 				continue;
710 
711 				// not reached
712 				break;
713 		}
714 		// each Process_xx function has to process the entire input buffer, therefore it's empty now.
715 		export_packets++;
716 
717 		// flush current buffer to disc
718 		if ( fs->nffile->block_header->size > BUFFSIZE ) {
719 			// fishy! - we already wrote into someone elses memory! - I'm sorry
720 			// reset output buffer - data may be lost, as we don not know, where it happen
721 			fs->nffile->block_header->size 		 = 0;
722 			fs->nffile->block_header->NumRecords = 0;
723 			fs->nffile->buff_ptr = (void *)((pointer_addr_t)fs->nffile->block_header + sizeof(data_block_header_t) );
724 			LogError("### Software bug ### Ident: %s, output buffer overflow: expect memory inconsitency", fs->Ident);
725 		}
726 	}
727 
728 	if ( verbose && blast_failures ) {
729 		fprintf(stderr, "Total missed packets: %u\n", blast_failures);
730 	}
731 	free(in_buff);
732 
733 	fs = FlowSource;
734 	while ( fs ) {
735 		DisposeFile(fs->nffile);
736 		fs = fs->next;
737 	}
738 
739 } /* End of run */
740 
main(int argc,char ** argv)741 int main(int argc, char **argv) {
742 
743 char	*bindhost, *datadir, pidstr[32], *launch_process;
744 char	*userid, *groupid, *checkptr, *listenport, *mcastgroup, *extension_tags;
745 char	*Ident, *dynsrcdir, *time_extension, pidfile[MAXPATHLEN];
746 struct stat fstat;
747 packet_function_t receive_packet;
748 repeater_t repeater[MAX_REPEATERS];
749 FlowSource_t *fs;
750 struct sigaction act;
751 int		family, bufflen;
752 time_t 	twin, t_start;
753 int		sock, do_daemonize, expire, spec_time_extension, report_sequence;
754 int		subdir_index, sampling_rate, compress;
755 int		c, i;
756 #ifdef PCAP
757 char	*pcap_file = NULL;
758 #endif
759 
760 	receive_packet 	= recvfrom;
761 	verbose = do_daemonize = 0;
762 	bufflen  		= 0;
763 	family			= AF_UNSPEC;
764 	launcher_pid	= 0;
765 	launcher_alive	= 0;
766 	report_sequence	= 0;
767 	listenport		= DEFAULTCISCOPORT;
768 	bindhost 		= NULL;
769 	mcastgroup		= NULL;
770 	pidfile[0]		= 0;
771 	launch_process	= NULL;
772 	userid 			= groupid = NULL;
773 	twin	 		= TIME_WINDOW;
774 	datadir	 		= NULL;
775 	subdir_index	= 0;
776 	time_extension	= "%Y%m%d%H%M";
777 	spec_time_extension = 0;
778 	expire			= 0;
779 	sampling_rate	= 1;
780 	compress		= NOT_COMPRESSED;
781 	memset((void *)&repeater, 0, sizeof(repeater));
782 	for ( i = 0; i < MAX_REPEATERS; i++ ) {
783 		repeater[i].family = AF_UNSPEC;
784 	}
785 	Ident			= "none";
786 	FlowSource		= NULL;
787 	extension_tags	= DefaultExtensions;
788 	dynsrcdir		= NULL;
789 
790 	while ((c = getopt(argc, argv, "46ef:whEVI:DB:b:jl:J:M:n:p:P:R:S:s:T:t:x:Xru:g:yzZ")) != EOF) {
791 		switch (c) {
792 			case 'h':
793 				usage(argv[0]);
794 				exit(0);
795 				break;
796 			case 'u':
797 				userid  = optarg;
798 				break;
799 			case 'g':
800 				groupid  = optarg;
801 				break;
802 			case 'e':
803 				expire = 1;
804 				break;
805 			case 'f': {
806 #ifdef PCAP
807 				struct stat	fstat;
808 				pcap_file = optarg;
809 				stat(pcap_file, &fstat);
810 				if ( !S_ISREG(fstat.st_mode) ) {
811 					fprintf(stderr, "Not a regular file: %s\n", pcap_file);
812 					exit(254);
813 				}
814 #else
815 				fprintf(stderr, "PCAP reader not compiled! Option ignored!\n");
816 #endif
817 				} break;
818 			case 'E':
819 				verbose = 1;
820 				Setv6Mode(1);
821 				break;
822 			case 'V':
823 				printf("%s: Version: %s\n",argv[0], nfdump_version);
824 				exit(0);
825 				break;
826 			case 'D':
827 				do_daemonize = 1;
828 				break;
829 			case 'I':
830 				Ident = strdup(optarg);
831 				break;
832 			case 'M':
833 				dynsrcdir = strdup(optarg);
834 				if ( strlen(dynsrcdir) > MAXPATHLEN ) {
835 					fprintf(stderr, "ERROR: Path too long!\n");
836 					exit(255);
837 				}
838 				if ( stat(dynsrcdir, &fstat) < 0 ) {
839 					fprintf(stderr, "stat() failed on %s: %s\n", dynsrcdir, strerror(errno));
840 					exit(255);
841 				}
842 				if ( !(fstat.st_mode & S_IFDIR) ) {
843 					fprintf(stderr, "No such directory: %s\n", dynsrcdir);
844 					break;
845 				}
846 				if ( !SetDynamicSourcesDir(&FlowSource, dynsrcdir) ) {
847 					fprintf(stderr, "-l, -M and -n are mutually exclusive\n");
848 					break;
849 				}
850 				break;
851 			case 'n':
852 				if ( AddFlowSource(&FlowSource, optarg) != 1 )
853 					exit(255);
854 				break;
855 			case 'w':
856 				// allow for compatibility - always sync timeslot
857 				break;
858 			case 'B':
859 				bufflen = strtol(optarg, &checkptr, 10);
860 				if ( (checkptr != NULL && *checkptr == 0) && bufflen > 0 )
861 					break;
862 				fprintf(stderr,"Argument error for -B\n");
863 				exit(255);
864 			case 'b':
865 				bindhost = optarg;
866 				break;
867 			case 'J':
868 				mcastgroup = optarg;
869 				break;
870 			case 'p':
871 				listenport = optarg;
872 				break;
873 			case 'P':
874 				if ( optarg[0] == '/' ) { 	// absolute path given
875 					strncpy(pidfile, optarg, MAXPATHLEN-1);
876 				} else {					// path relative to current working directory
877 					char tmp[MAXPATHLEN];
878 					if ( !getcwd(tmp, MAXPATHLEN-1) ) {
879 						fprintf(stderr, "Failed to get current working directory: %s\n", strerror(errno));
880 						exit(255);
881 					}
882 					tmp[MAXPATHLEN-1] = 0;
883 					if ( (strlen(tmp) + strlen(optarg) + 3) < MAXPATHLEN ) {
884 						snprintf(pidfile, MAXPATHLEN - 3 - strlen(tmp), "%s/%s", tmp, optarg);
885 					} else {
886 						fprintf(stderr, "pidfile MAXPATHLEN error:\n");
887 						exit(255);
888 					}
889 				}
890 				// pidfile now absolute path
891 				pidfile[MAXPATHLEN-1] = 0;
892 				break;
893 			case 'R': {
894 				char *port, *hostname;
895 				char *p = strchr(optarg, '/');
896 				int i = 0;
897 				if ( p ) {
898 					*p++ = '\0';
899 					port = strdup(p);
900 				} else {
901 					port = DEFAULTCISCOPORT;
902 				}
903 				hostname = strdup(optarg);
904 				while ( repeater[i].hostname && (i < MAX_REPEATERS) ) i++;
905 				if ( i == MAX_REPEATERS ) {
906 					fprintf(stderr, "Too many packet repeaters! Max: %i repeaters allowed.\n", MAX_REPEATERS);
907 					exit(255);
908 				}
909 				repeater[i].hostname = hostname;
910 				repeater[i].port 	 = port;
911 
912 				break; }
913 			case 'r':
914 				report_sequence = 1;
915 				break;
916 			case 's':
917 				// a negative sampling rate is set as the overwrite sampling rate
918 				sampling_rate = (int)strtol(optarg, (char **)NULL, 10);
919 				if ( (sampling_rate == 0 ) ||
920 					 (sampling_rate < 0 && sampling_rate < -10000000) ||
921 					 (sampling_rate > 0 && sampling_rate > 10000000) ) {
922 					fprintf(stderr, "Invalid sampling rate: %s\n", optarg);
923 					exit(255);
924 				}
925 				break;
926 			case 'T': {
927 				size_t len = strlen(optarg);
928 				extension_tags = optarg;
929 				if ( len == 0 || len > 128 ) {
930 					fprintf(stderr, "Extension length error. Unexpected option '%s'\n", extension_tags);
931 					exit(255);
932 				}
933 				break; }
934 			case 'l':
935 				datadir = optarg;
936 				if ( strlen(datadir) > MAXPATHLEN ) {
937 					fprintf(stderr, "ERROR: Path too long!\n");
938 					exit(255);
939 				}
940 				datadir = realpath(datadir, NULL);
941 				if ( stat(datadir, &fstat) < 0 ) {
942 					fprintf(stderr, "stat() failed on %s: %s\n", datadir, strerror(errno));
943 					exit(255);
944 				}
945 				if ( !(fstat.st_mode & S_IFDIR) ) {
946 					fprintf(stderr, "No such directory: %s\n", datadir);
947 					break;
948 				}
949 				break;
950 			case 'S':
951 				subdir_index = atoi(optarg);
952 				break;
953 			case 't':
954 				twin = atoi(optarg);
955 				if ( twin < 2 ) {
956 					LogError("time interval <= 2s not allowed");
957 					exit(255);
958 				}
959 				if (twin < 60) {
960 					time_extension	= "%Y%m%d%H%M%S";
961 				}
962 				break;
963 			case 'x':
964 				launch_process = optarg;
965 				break;
966 			case 'j':
967 				if ( compress ) {
968 					LogError("Use one compression: -z for LZO, -j for BZ2 or -y for LZ4 compression\n");
969 					exit(255);
970 				}
971 				compress = BZ2_COMPRESSED;
972 				break;
973 			case 'y':
974 				if ( compress ) {
975 					LogError("Use one compression: -z for LZO, -j for BZ2 or -y for LZ4 compression\n");
976 					exit(255);
977 				}
978 				compress = LZ4_COMPRESSED;
979 				break;
980 			case 'z':
981 				if ( compress ) {
982 					LogError("Use one compression: -z for LZO, -j for BZ2 or -y for LZ4 compression\n");
983 					exit(255);
984 				}
985 				compress = LZO_COMPRESSED;
986 				break;
987 			case 'Z':
988 				time_extension	= "%Y%m%d%H%M%z";
989 				spec_time_extension = 1;
990 				break;
991 			case '4':
992 				if ( family == AF_UNSPEC )
993 					family = AF_INET;
994 				else {
995 					fprintf(stderr, "ERROR, Accepts only one protocol IPv4 or IPv6!\n");
996 					exit(255);
997 				}
998 				break;
999 			case '6':
1000 				if ( family == AF_UNSPEC )
1001 					family = AF_INET6;
1002 				else {
1003 					fprintf(stderr, "ERROR, Accepts only one protocol IPv4 or IPv6!\n");
1004 					exit(255);
1005 				}
1006 				break;
1007 			default:
1008 				usage(argv[0]);
1009 				exit(255);
1010 		}
1011 	}
1012 
1013 	if ( FlowSource == NULL && datadir == NULL && dynsrcdir == NULL ) {
1014 		fprintf(stderr, "ERROR, Missing -n (-l/-I) or -M source definitions\n");
1015 		exit(255);
1016 	}
1017 	if ( FlowSource == NULL && datadir != NULL && !AddDefaultFlowSource(&FlowSource, Ident, datadir) ) {
1018 		fprintf(stderr, "Failed to add default data collector directory\n");
1019 		exit(255);
1020 	}
1021 
1022 	if ( bindhost && mcastgroup ) {
1023 		fprintf(stderr, "ERROR, -b and -j are mutually exclusive!!\n");
1024 		exit(255);
1025 	}
1026 
1027 	if ( !InitLog(do_daemonize, argv[0], SYSLOG_FACILITY, verbose) ) {
1028 		exit(255);
1029 	}
1030 
1031 	if ( expire && spec_time_extension ) {
1032 		fprintf(stderr, "ERROR, -Z timezone extension breaks expire -e\n");
1033 		exit(255);
1034 	}
1035 
1036 	InitExtensionMaps(NO_EXTENSION_LIST);
1037 	SetupExtensionDescriptors(strdup(extension_tags));
1038 
1039 	// Debug code to read from pcap file
1040 #ifdef PCAP
1041 	sock = 0;
1042 	if ( pcap_file ) {
1043 		printf("Setup pcap reader\n");
1044 		setup_packethandler(pcap_file, NULL);
1045 		receive_packet 	= NextPacket;
1046 	} else
1047 #endif
1048 	if ( mcastgroup )
1049 		sock = Multicast_receive_socket (mcastgroup, listenport, family, bufflen);
1050 	else
1051 		sock = Unicast_receive_socket(bindhost, listenport, family, bufflen );
1052 
1053 	if ( sock == -1 ) {
1054 		fprintf(stderr,"Terminated due to errors.\n");
1055 		exit(255);
1056 	}
1057 
1058 	i = 0;
1059 	while ( repeater[i].hostname && (i < MAX_REPEATERS) ) {
1060 		repeater[i].sockfd = Unicast_send_socket (repeater[i].hostname, repeater[i].port, repeater[i].family, bufflen,
1061 											&repeater[i].addr, &repeater[i].addrlen );
1062 		if ( repeater[i].sockfd <= 0 )
1063 			exit(255);
1064 		LogInfo("Replay flows to host: %s port: %s", repeater[i].hostname, repeater[i].port);
1065 		i++;
1066 	}
1067 
1068 	if ( sampling_rate < 0 ) {
1069 		default_sampling = -sampling_rate;
1070 		overwrite_sampling = default_sampling;
1071 	} else {
1072 		default_sampling = sampling_rate;
1073 	}
1074 
1075 	SetPriv(userid, groupid);
1076 
1077 	if ( subdir_index && !InitHierPath(subdir_index) ) {
1078 		close(sock);
1079 		exit(255);
1080 	}
1081 
1082 	// check if pid file exists and if so, if a process with registered pid is running
1083 	if ( strlen(pidfile) ) {
1084 		int pidf;
1085 		pidf = open(pidfile, O_RDONLY, 0);
1086 		if ( pidf > 0 ) {
1087 			// pid file exists
1088 			char s[32];
1089 			ssize_t len;
1090 			len = read(pidf, (void *)s, 31);
1091 			close(pidf);
1092 			s[31] = '\0';
1093 			if ( len < 0 ) {
1094 				fprintf(stderr, "read() error existing pid file: %s\n", strerror(errno));
1095 				exit(255);
1096 			} else {
1097 				unsigned long pid = atol(s);
1098 				if ( pid == 0 ) {
1099 					// garbage - use this file
1100 					unlink(pidfile);
1101 				} else {
1102 					if ( kill(pid, 0) == 0 ) {
1103 						// process exists
1104 						fprintf(stderr, "A process with pid %lu registered in pidfile %s is already running!\n",
1105 							pid, strerror(errno));
1106 						exit(255);
1107 					} else {
1108 						// no such process - use this file
1109 						unlink(pidfile);
1110 					}
1111 				}
1112 			}
1113 		} else {
1114 			if ( errno != ENOENT ) {
1115 				fprintf(stderr, "open() error existing pid file: %s\n", strerror(errno));
1116 				exit(255);
1117 			} // else errno == ENOENT - no file - this is fine
1118 		}
1119 	}
1120 
1121 	if (argc - optind > 1) {
1122 		usage(argv[0]);
1123 		close(sock);
1124 		exit(255);
1125 	}
1126 
1127 	t_start = time(NULL);
1128 	t_start = t_start - ( t_start % twin);
1129 
1130 	if ( do_daemonize ) {
1131 		verbose = 0;
1132 		daemonize();
1133 	}
1134 	if (strlen(pidfile)) {
1135 		pid_t pid = getpid();
1136 		int pidf  = open(pidfile, O_RDWR|O_TRUNC|O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
1137 		if ( pidf == -1 ) {
1138 			LogError("Error opening pid file: '%s' %s", pidfile, strerror(errno));
1139 			close(sock);
1140 			exit(255);
1141 		}
1142 		snprintf(pidstr,31,"%lu\n", (unsigned long)pid);
1143 		if ( write(pidf, pidstr, strlen(pidstr)) <= 0 ) {
1144 			LogError("Error write pid file: '%s' %s", pidfile, strerror(errno));
1145 		}
1146 		close(pidf);
1147 	}
1148 
1149 	done = 0;
1150 	if ( launch_process || expire ) {
1151 
1152 		// create laucher comm memory struct
1153 		shmem = mmap(0, sizeof(srecord_t), PROT_READ | PROT_WRITE, MAP_ANON | MAP_SHARED, -1, 0);
1154 		if ( shmem == MAP_FAILED) {
1155 			LogError("mmap() error in %s:%i: %s", __FILE__, __LINE__ , strerror(errno));
1156 			close(sock);
1157 			exit(255);
1158 		}
1159 
1160 		launcher_pid = fork();
1161 		switch (launcher_pid) {
1162 			case 0:
1163 				// child
1164 				close(sock);
1165 				launcher(shmem, FlowSource, launch_process, expire);
1166 				_exit(0);
1167 				break;
1168 			case -1:
1169 				LogError("fork() error: %s", strerror(errno));
1170 				if ( strlen(pidfile) )
1171 					unlink(pidfile);
1172 				exit(255);
1173 				break;
1174 			default:
1175 				// parent
1176 			launcher_alive = 1;
1177 			LogInfo("Launcher[%i] forked", launcher_pid);
1178 		}
1179 	}
1180 
1181 	fs = FlowSource;
1182 	while ( fs ) {
1183 		if ( InitBookkeeper(&fs->bookkeeper, fs->datadir, getpid(), launcher_pid) != BOOKKEEPER_OK ) {
1184 			LogError("initialize bookkeeper failed.");
1185 
1186 			// release all already allocated bookkeepers
1187 			fs = FlowSource;
1188 			while ( fs && fs->bookkeeper ) {
1189 				ReleaseBookkeeper(fs->bookkeeper, DESTROY_BOOKKEEPER);
1190 				fs = fs->next;
1191 			}
1192 			close(sock);
1193 			if ( launcher_pid )
1194 				kill_launcher(launcher_pid);
1195 			if ( strlen(pidfile) )
1196 				unlink(pidfile);
1197 			exit(255);
1198 		}
1199 
1200 		// Init the extension map list
1201 		if ( !InitExtensionMapList(fs) ) {
1202 			// error message goes to syslog
1203 			exit(255);
1204 		}
1205 
1206 		fs = fs->next;
1207 	}
1208 
1209 	/* Signal handling */
1210 	memset((void *)&act,0,sizeof(struct sigaction));
1211 	act.sa_handler = IntHandler;
1212 	sigemptyset(&act.sa_mask);
1213 	act.sa_flags = 0;
1214 	sigaction(SIGTERM, &act, NULL);
1215 	sigaction(SIGINT, &act, NULL);
1216 	sigaction(SIGHUP, &act, NULL);
1217 	sigaction(SIGALRM, &act, NULL);
1218 	sigaction(SIGCHLD, &act, NULL);
1219 
1220 	LogInfo("Startup.");
1221 	run(receive_packet, sock, repeater, twin, t_start, report_sequence, subdir_index,
1222 		time_extension, compress);
1223 	close(sock);
1224 	kill_launcher(launcher_pid);
1225 
1226 	fs = FlowSource;
1227 	while ( fs && fs->bookkeeper ) {
1228 		dirstat_t 	*dirstat;
1229 		// if we do not auto expire and there is a stat file, update the stats before we leave
1230 		if ( expire == 0 && ReadStatInfo(fs->datadir, &dirstat, LOCK_IF_EXISTS) == STATFILE_OK ) {
1231 			UpdateBookStat(dirstat, fs->bookkeeper);
1232 			WriteStatInfo(dirstat);
1233 			LogInfo("Updating statinfo in directory '%s'", datadir);
1234 		}
1235 
1236 		ReleaseBookkeeper(fs->bookkeeper, DESTROY_BOOKKEEPER);
1237 		fs = fs->next;
1238 	}
1239 
1240 	LogInfo("Terminating nfcapd.");
1241 	EndLog();
1242 
1243 	if ( strlen(pidfile) )
1244 		unlink(pidfile);
1245 
1246 	return 0;
1247 
1248 } /* End of main */
1249