1 /*
2 * Copyright (c) 2009-2020, Peter Haag
3 * Copyright (c) 2004-2008, SWITCH - Teleinformatikdienste fuer Lehre und Forschung
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright notice,
12 * this list of conditions and the following disclaimer in the documentation
13 * and/or other materials provided with the distribution.
14 * * Neither the name of the author nor the names of its contributors may be
15 * used to endorse or promote products derived from this software without
16 * specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 *
30 */
31
32 #include "config.h"
33
34 #include <stdio.h>
35 #include <unistd.h>
36 #include <stdlib.h>
37 #include <stdarg.h>
38 #include <errno.h>
39 #include <time.h>
40 #include <string.h>
41 #include <ctype.h>
42 #include <sys/types.h>
43 #include <sys/stat.h>
44 #include <fcntl.h>
45 #include <sys/socket.h>
46 #include <netdb.h>
47 #include <netinet/in.h>
48 #include <arpa/inet.h>
49 #include <sys/resource.h>
50
51 #ifdef HAVE_STDINT_H
52 #include <stdint.h>
53 #endif
54
55 #ifdef HAVE_STDIO_EXT_H
56 #include <stdio_ext.h>
57 #endif
58
59 #include "util.h"
60 #include "nfdump.h"
61 #include "nffile.h"
62 #include "nfx.h"
63 #include "flist.h"
64 #include "nftree.h"
65 #include "nfnet.h"
66 #include "bookkeeper.h"
67 #include "collector.h"
68 #include "exporter.h"
69 #include "netflow_v5_v7.h"
70 #include "netflow_v9.h"
71 #include "nfprof.h"
72
73 #define DEFAULTCISCOPORT "9995"
74 #define DEFAULTHOSTNAME "127.0.0.1"
75
76 #undef FPURGE
77 #ifdef HAVE___FPURGE
78 #define FPURGE __fpurge
79 #endif
80 #ifdef HAVE_FPURGE
81 #define FPURGE fpurge
82 #endif
83
84 /* Global Variables */
85 FilterEngine_t *Engine;
86 static int verbose;
87
88 /* Local Variables */
89 static const char *nfdump_version = VERSION;
90
91 static send_peer_t peer;
92
93 extension_map_list_t *extension_map_list;
94
95 /* Function Prototypes */
96 static void usage(char *name);
97
98 static void send_blast(unsigned int delay );
99
100 static void send_data(char *rfile, time_t twin_start, time_t twin_end, uint32_t count,
101 unsigned int delay, int confirm, int netflow_version, int distribution);
102
103 static int FlushBuffer(int confirm);
104
105 /* Functions */
106
107 #include "nffile_inline.c"
108 #include "nfdump_inline.c"
109
usage(char * name)110 static void usage(char *name) {
111 printf("usage %s [options] [\"filter\"]\n"
112 "-h\t\tthis text you see right here\n"
113 "-V\t\tPrint version and exit.\n"
114 "-E\t\tPrint verbose messages. For debugging purpose only.\n"
115 "-H <Host/ip>\tTarget IP address default: 127.0.0.1\n"
116 "-j <mcast>\tSend packets to multicast group\n"
117 "-4\t\tForce IPv4 protocol.\n"
118 "-6\t\tForce IPv6 protocol.\n"
119 "-L <log>\tLog to syslog facility <log>\n"
120 "-p <port>\tTarget port default 9995\n"
121 "-d <usec>\tDelay in usec between packets. default 10\n"
122 "-c <cnt>\tPacket count. default send all packets\n"
123 "-b <bsize>\tSend buffer size.\n"
124 "-r <input>\tread from file. default: stdin\n"
125 "-f <filter>\tfilter syntaxfile\n"
126 "-v <version>\tUse netflow version to send flows. Either 5 or 9\n"
127 "-z <distribution>\tSimulate real time distribution with coefficient\n"
128 "-t <time>\ttime window for sending packets\n"
129 "\t\tyyyy/MM/dd.hh:mm:ss[-yyyy/MM/dd.hh:mm:ss]\n"
130 , name);
131 } /* usage */
132
FlushBuffer(int confirm)133 static int FlushBuffer(int confirm) {
134 size_t len = (pointer_addr_t)peer.buff_ptr - (pointer_addr_t)peer.send_buffer;
135 static unsigned long cnt = 1;
136
137 peer.flush = 0;
138 peer.buff_ptr = peer.send_buffer;
139 if ( confirm ) {
140 FPURGE(stdin);
141 printf("Press any key to send next UDP packet [%lu] ", cnt++);
142 fflush(stdout);
143 fgetc(stdin);
144 }
145 return sendto(peer.sockfd, peer.send_buffer, len, 0, (struct sockaddr *)&(peer.addr), peer.addrlen);
146 } // End of FlushBuffer
147
148
send_blast(unsigned int delay)149 static void send_blast(unsigned int delay ) {
150 common_flow_header_t *header;
151 nfprof_t profile_data;
152 int i, ret;
153 u_long usec, sec;
154 double fps;
155
156 peer.send_buffer = malloc(1400);
157 if ( !peer.send_buffer ) {
158 perror("Memory allocation error");
159 close(peer.sockfd);
160 return;
161 }
162 header = (common_flow_header_t *)peer.send_buffer;
163 header->version = htons(255);
164 nfprof_start(&profile_data);
165 for ( i = 0; i < 65535; i++ ) {
166 header->count = htons(i);
167 ret = sendto(peer.sockfd, peer.send_buffer, 1400, 0, (struct sockaddr *)&peer.addr, peer.addrlen);
168 if ( ret < 0 || ret != 1400 ) {
169 perror("Error sending data");
170 }
171
172 if ( delay ) {
173 // sleep as specified
174 usleep(delay);
175 }
176 }
177 nfprof_end(&profile_data, 8*65535*1400);
178
179
180 usec = profile_data.used.ru_utime.tv_usec + profile_data.used.ru_stime.tv_usec;
181 sec = profile_data.used.ru_utime.tv_sec + profile_data.used.ru_stime.tv_sec;
182
183 if (usec > 1000000)
184 usec -= 1000000, ++sec;
185
186 if (profile_data.tend.tv_usec < profile_data.tstart.tv_usec)
187 profile_data.tend.tv_usec += 1000000, --profile_data.tend.tv_sec;
188
189 usec = profile_data.tend.tv_usec - profile_data.tstart.tv_usec;
190 sec = profile_data.tend.tv_sec - profile_data.tstart.tv_sec;
191 fps = (double)profile_data.numflows / ((double)sec + ((double)usec/1000000));
192
193 fprintf(stdout, "Wall: %lu.%-3.3lus bps: %-10.1f\n", sec, usec/1000, fps);
194
195
196 } // End of send_blast
197
send_data(char * rfile,time_t twin_start,time_t twin_end,uint32_t count,unsigned int delay,int confirm,int netflow_version,int distribution)198 static void send_data(char *rfile, time_t twin_start,
199 time_t twin_end, uint32_t count, unsigned int delay, int confirm, int netflow_version, int distribution) {
200 master_record_t master_record;
201 common_record_t *flow_record;
202 nffile_t *nffile;
203 int i, done, ret, again;
204 uint32_t numflows, cnt;
205
206 // z-parameter variables
207 struct timeval todayTime, currentTime;
208 double today = 0, reftime = 0;
209 int reducer = 0;
210
211 // Get the first file handle
212 nffile = GetNextFile(NULL, twin_start, twin_end);
213 if ( !nffile ) {
214 LogError("GetNextFile() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno) );
215 return;
216 }
217 if ( nffile == EMPTY_LIST ) {
218 LogError("Empty file list. No files to process\n");
219 return;
220 }
221
222 peer.send_buffer = malloc(UDP_PACKET_SIZE);
223 peer.flush = 0;
224 if ( !peer.send_buffer ) {
225 LogError("malloc() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno) );
226 CloseFile(nffile);
227 DisposeFile(nffile);
228 return;
229 }
230 peer.buff_ptr = peer.send_buffer;
231 peer.endp = (void *)((pointer_addr_t)peer.send_buffer + UDP_PACKET_SIZE - 1);
232
233 if ( netflow_version == 5 )
234 Init_v5_v7_output(&peer);
235 else
236 Init_v9_output(&peer);
237
238 numflows = 0;
239 done = 0;
240
241 // setup Filter Engine to point to master_record, as any record read from file
242 // is expanded into this record
243 Engine->nfrecord = (uint64_t *)&master_record;
244
245 cnt = 0;
246 while ( !done ) {
247 // get next data block from file
248 ret = ReadBlock(nffile);
249
250 switch (ret) {
251 case NF_CORRUPT:
252 case NF_ERROR:
253 if ( ret == NF_CORRUPT )
254 LogError("Skip corrupt data file '%s'\n",GetCurrentFilename());
255 else
256 LogError("Read error in file '%s': %s\n",GetCurrentFilename(), strerror(errno) );
257 // fall through - get next file in chain
258 case NF_EOF: {
259 nffile_t *next = GetNextFile(nffile, twin_start, twin_end);
260 if ( next == EMPTY_LIST ) {
261 done = 1;
262 }
263 if ( next == NULL ) {
264 done = 1;
265 LogError("Unexpected end of file list\n");
266 }
267 // else continue with next file
268 continue;
269
270 } break; // not really needed
271 }
272
273 if ( nffile->block_header->id != DATA_BLOCK_TYPE_2 ) {
274 LogError("Can't process block type %u. Skip block.\n", nffile->block_header->id);
275 continue;
276 }
277
278 // cnt is the number of blocks, which survived the filter
279 // and added to the output buffer
280 flow_record = nffile->buff_ptr;
281 uint32_t sumSize = 0;
282 for ( i=0; i < nffile->block_header->NumRecords; i++ ) {
283 int match;
284 if ( (sumSize + flow_record->size) > ret ) {
285 LogError("Corrupt data file. Inconsistent block size in %s line %d\n", __FILE__, __LINE__);
286 exit(255);
287 }
288 sumSize += flow_record->size;
289
290 switch ( flow_record->type ) {
291 case CommonRecordType: {
292 if ( extension_map_list->slot[flow_record->ext_map] == NULL ) {
293 LogError("Corrupt data file. Missing extension map %u. Skip record.\n", flow_record->ext_map);
294 flow_record = (common_record_t *)((pointer_addr_t)flow_record + flow_record->size);
295 continue;
296 }
297
298 // if no filter is given, the result is always true
299 ExpandRecord_v2( flow_record, extension_map_list->slot[flow_record->ext_map], NULL, &master_record);
300
301 match = twin_start && (master_record.first < twin_start || master_record.last > twin_end) ? 0 : 1;
302
303 // filter netflow record with user supplied filter
304 if ( match )
305 match = (*Engine->FilterEngine)(Engine);
306
307 if ( match == 0 ) { // record failed to pass all filters
308 // increment pointer by number of bytes for netflow record
309 flow_record = (common_record_t *)((pointer_addr_t)flow_record + flow_record->size);
310 // go to next record
311 continue;
312 }
313 // Records passed filter -> continue record processing
314
315 if ( netflow_version == 5 )
316 again = Add_v5_output_record(&master_record, &peer);
317 else
318 again = Add_v9_output_record(&master_record, &peer);
319
320 cnt++;
321 numflows++;
322
323 if ( peer.flush ) {
324 int err = FlushBuffer(confirm);
325
326 if ( err < 0 ) {
327 perror("Error sending data");
328 CloseFile(nffile);
329 DisposeFile(nffile);
330 return;
331 }
332
333 if ( delay ) {
334 // sleep as specified
335 usleep(delay);
336 }
337 cnt = 0;
338 }
339
340 if ( again ) {
341 if ( netflow_version == 5 )
342 Add_v5_output_record(&master_record, &peer);
343 else
344 Add_v9_output_record(&master_record, &peer);
345 cnt++;
346 }
347
348 } break;
349 case ExtensionMapType: {
350 extension_map_t *map = (extension_map_t *)flow_record;
351
352 int ret = Insert_Extension_Map(extension_map_list, map);
353 switch (ret) {
354 case 0:
355 break; // map already known and flushed
356 case 1:
357 break; // new map
358 default:
359 LogError("Corrupt data file. Unable to decode at %s line %d\n", __FILE__, __LINE__);
360 exit(255);
361 }
362
363 } break;
364 case LegacyRecordType1:
365 case LegacyRecordType2:
366 case ExporterInfoRecordType:
367 case ExporterStatRecordType:
368 case SamplerInfoRecordype:
369 // Silently skip exporter/sampler records
370 break;
371 default: {
372 LogError("Skip unknown record type %i\n", flow_record->type);
373 }
374 }
375
376 // z-parameter
377 //first and last are line (tstart and tend) timestamp with milliseconds
378 // first = (double) flow_record->first + ((double)flow_record->msec_first / 1000);
379 double last = (double) flow_record->last + ((double)flow_record->msec_last / 1000);
380
381 gettimeofday(¤tTime, NULL);
382 double now = (double)currentTime.tv_sec + (double)currentTime.tv_usec / 1000000;
383
384 // remove incoherent values
385 if (reftime == 0 && last > 1000000000 && last < 2000000000){
386 reftime = last;
387 gettimeofday(&todayTime, NULL);
388 today = (double)todayTime.tv_sec + (double)todayTime.tv_usec / 1000000;
389 }
390
391 // Reducer avoid to have too much computation: It takes 1 over 3 line to regulate sending time
392 if (reducer % 3 == 0 && distribution != 0 && reftime != 0 && last > 1000000000){
393 while (last - reftime > distribution * (now - today)){
394 gettimeofday(¤tTime, NULL);
395 now = (double)currentTime.tv_sec + (double)currentTime.tv_usec / 1000000;
396 }
397 }
398 reducer++;
399
400 // Advance pointer by number of bytes for netflow record
401 flow_record = (common_record_t *)((pointer_addr_t)flow_record + flow_record->size);
402
403 }
404 } // while
405
406 // flush still remaining records
407 if ( cnt ) {
408 ret = FlushBuffer(confirm);
409
410 if ( ret < 0 ) {
411 perror("Error sending data");
412 }
413
414 } // if cnt
415
416 if (nffile) {
417 CloseFile(nffile);
418 DisposeFile(nffile);
419 }
420
421 close(peer.sockfd);
422
423 return;
424
425 } // End of send_data
426
427
main(int argc,char ** argv)428 int main( int argc, char **argv ) {
429 struct stat stat_buff;
430 char *rfile, *ffile, *filter, *tstring;
431 int c, confirm, ffd, ret, blast, netflow_version, distribution;
432 unsigned int delay, count, sockbuff_size;
433 time_t t_start, t_end;
434
435 rfile = ffile = filter = tstring = NULL;
436 t_start = t_end = 0;
437
438 peer.hostname = NULL;
439 peer.port = DEFAULTCISCOPORT;
440 peer.mcast = 0;
441 peer.family = AF_UNSPEC;
442 peer.sockfd = 0;
443
444 delay = 1;
445 count = 0xFFFFFFFF;
446 sockbuff_size = 0;
447 netflow_version = 5;
448 blast = 0;
449 verbose = 0;
450 confirm = 0;
451 distribution = 0;
452 while ((c = getopt(argc, argv, "46BhH:i:K:L:p:d:c:b:j:r:f:t:v:z:VY")) != EOF) {
453 switch (c) {
454 case 'h':
455 usage(argv[0]);
456 exit(0);
457 break;
458 case 'B':
459 blast = 1;
460 break;
461 case 'E':
462 verbose = 1;
463 break;
464 case 'V':
465 printf("%s: Version: %s\n",argv[0], nfdump_version);
466 exit(0);
467 break;
468 case 'Y':
469 confirm = 1;
470 break;
471 case 'H':
472 case 'i': // compatibility with old version
473 peer.hostname = strdup(optarg);
474 peer.mcast = 0;
475 break;
476 case 'j':
477 if ( peer.hostname == NULL ) {
478 peer.hostname = strdup(optarg);
479 peer.mcast = 1;
480 } else {
481 LogError("ERROR, -H(-i) and -j are mutually exclusive!!\n");
482 exit(255);
483 }
484 break;
485 case 'K':
486 LogError("*** Anonymization moved! Use nfanon to anonymize flows first!\n");
487 exit(255);
488 break;
489 case 'L':
490 if ( !InitLog(0, argv[0], optarg, verbose) )
491 exit(255);
492 break;
493 case 'p':
494 peer.port = strdup(optarg);
495 break;
496 case 'd':
497 delay = atoi(optarg);
498 break;
499 case 'v':
500 netflow_version = atoi(optarg);
501 if ( netflow_version != 5 && netflow_version != 9 ) {
502 LogError("Invalid netflow version: %s. Accept only 5 or 9!\n", optarg);
503 exit(255);
504 }
505 break;
506 case 'c':
507 count = atoi(optarg);
508 break;
509 case 'b':
510 sockbuff_size = atoi(optarg);
511 break;
512 case 'f':
513 ffile = optarg;
514 break;
515 case 't':
516 tstring = optarg;
517 break;
518 case 'r':
519 rfile = optarg;
520 break;
521 case 'z':
522 distribution = atoi(optarg);
523 break;
524 case '4':
525 if ( peer.family == AF_UNSPEC )
526 peer.family = AF_INET;
527 else {
528 LogError("ERROR, Accepts only one protocol IPv4 or IPv6!\n");
529 exit(255);
530 }
531 break;
532 case '6':
533 if ( peer.family == AF_UNSPEC )
534 peer.family = AF_INET6;
535 else {
536 LogError("ERROR, Accepts only one protocol IPv4 or IPv6!\n");
537 exit(255);
538 }
539 break;
540 default:
541 usage(argv[0]);
542 exit(0);
543 }
544 }
545 if (argc - optind > 1) {
546 usage(argv[0]);
547 exit(255);
548 } else {
549 /* user specified a pcap filter */
550 filter = argv[optind];
551 }
552
553 if ( peer.hostname == NULL )
554 peer.hostname = DEFAULTHOSTNAME;
555
556 if ( !filter && ffile ) {
557 if ( stat(ffile, &stat_buff) ) {
558 perror("Can't stat file");
559 exit(255);
560 }
561 filter = (char *)malloc(stat_buff.st_size);
562 if ( !filter ) {
563 perror("Memory error");
564 exit(255);
565 }
566 ffd = open(ffile, O_RDONLY);
567 if ( ffd < 0 ) {
568 perror("Can't open file");
569 exit(255);
570 }
571 ret = read(ffd, (void *)filter, stat_buff.st_size);
572 if ( ret < 0 ) {
573 perror("Error reading file");
574 close(ffd);
575 exit(255);
576 }
577 close(ffd);
578 }
579
580 if ( !filter )
581 filter = "any";
582
583 Engine = CompileFilter(filter);
584 if ( !Engine )
585 exit(254);
586
587 if ( peer.mcast )
588 peer.sockfd = Multicast_send_socket (peer.hostname, peer.port, peer.family, sockbuff_size,
589 &peer.addr, &peer.addrlen );
590 else
591 peer.sockfd = Unicast_send_socket (peer.hostname, peer.port, peer.family, sockbuff_size,
592 &peer.addr, &peer.addrlen );
593 if ( peer.sockfd <= 0 ) {
594 exit(255);
595 }
596
597 if ( blast ) {
598 send_blast(delay );
599 exit(0);
600 }
601
602 extension_map_list = InitExtensionMaps(NEEDS_EXTENSION_LIST);
603
604 SetupInputFileSequence(NULL,rfile, NULL);
605
606 if ( tstring ) {
607 if ( !ScanTimeFrame(tstring, &t_start, &t_end) )
608 exit(255);
609 }
610
611 send_data(rfile, t_start, t_end, count, delay, confirm, netflow_version,distribution);
612
613 FreeExtensionMaps(extension_map_list);
614
615 return 0;
616 }
617