1 /*
2  *  Copyright (c) 2009-2020, Peter Haag
3  *  Copyright (c) 2004-2008, SWITCH - Teleinformatikdienste fuer Lehre und Forschung
4  *  All rights reserved.
5  *
6  *  Redistribution and use in source and binary forms, with or without
7  *  modification, are permitted provided that the following conditions are met:
8  *
9  *   * Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *   * Redistributions in binary form must reproduce the above copyright notice,
12  *     this list of conditions and the following disclaimer in the documentation
13  *     and/or other materials provided with the distribution.
14  *   * Neither the name of the author nor the names of its contributors may be
15  *     used to endorse or promote products derived from this software without
16  *     specific prior written permission.
17  *
18  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  *  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  *  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  *  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22  *  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  *  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  *  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  *  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  *  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  *  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  *  POSSIBILITY OF SUCH DAMAGE.
29  *
30  */
31 
32 #ifdef HAVE_CONFIG_H
33 #include "config.h"
34 #endif
35 
36 #include <stdio.h>
37 #include <sys/types.h>
38 #include <sys/socket.h>
39 #include <unistd.h>
40 #include <stdlib.h>
41 #include <netinet/in.h>
42 #include <arpa/inet.h>
43 #include <errno.h>
44 #include <string.h>
45 
46 #ifdef HAVE_STDINT_H
47 #include <stdint.h>
48 #endif
49 
50 #include "util.h"
51 #include "nfdump.h"
52 #include "nffile.h"
53 #include "nfx.h"
54 #include "nfnet.h"
55 #include "output_raw.h"
56 #include "bookkeeper.h"
57 #include "collector.h"
58 #include "exporter.h"
59 #include "netflow_v5_v7.h"
60 
61 extern extension_descriptor_t extension_descriptor[];
62 
63 /* module limited globals */
64 static int verbose;
65 static uint32_t default_sampling;
66 static uint32_t overwrite_sampling;
67 
68 static extension_info_t v5_extension_info;		// common for all v5 records
69 static uint16_t v5_output_record_size, v5_output_record_base_size;
70 
71 // All required extension to save full v5 records
72 static uint16_t v5_full_mapp[] = { EX_IO_SNMP_2, EX_AS_2, EX_MULIPLE, EX_NEXT_HOP_v4, EX_ROUTER_IP_v4, EX_ROUTER_ID, EX_RECEIVED, 0 };
73 
74 // to simplify, assume blocks with 64 bit counters to check for enough buffer space
75 // regardless if 32 or 64 bit packet/byte counters
76 #define V5_BLOCK_DATA_SIZE (sizeof(ipv4_block_t) - sizeof(uint32_t) + 2 * sizeof(uint64_t))
77 
78 typedef struct exporter_v5_s {
79 	// identical to exporter_t
80 	struct exporter_v5_s *next;
81 
82 	// exporter information
83 	exporter_info_record_t info;
84 
85 	uint64_t	packets;			// number of packets sent by this exporter
86 	uint64_t	flows;				// number of flow records sent by this exporter
87 	uint32_t	sequence_failure;	// number of sequence failues
88 	uint32_t	padding_errors;		// number of sequence failues
89 
90 	// sampler
91 	sampler_t		*sampler;
92 	// end of exporter_t
93 
94 	// sequence vars
95 	int64_t	 last_sequence;
96 	int64_t  sequence, distance;
97 	int64_t  last_count;
98 
99 	int		first;
100 
101 	// extension map
102 	extension_map_t 	 *extension_map;
103 
104 } exporter_v5_t;
105 
106 // for sending netflow v5
107 static netflow_v5_header_t	*v5_output_header;
108 static netflow_v5_record_t	*v5_output_record;
109 static exporter_v5_t 		output_engine;
110 
111 static inline exporter_v5_t *GetExporter(FlowSource_t *fs, netflow_v5_header_t *header);
112 
113 static inline int CheckBufferSpace(nffile_t *nffile, size_t required);
114 
115 /* functions */
116 
117 #include "nffile_inline.c"
118 
Init_v5_v7_input(int v,uint32_t sampling,uint32_t overwrite)119 int Init_v5_v7_input(int v, uint32_t sampling, uint32_t overwrite) {
120 int i, id, map_index;
121 int extension_size;
122 uint16_t	map_size;
123 
124 	verbose 		   = v;
125 	default_sampling   = sampling;
126 	overwrite_sampling = overwrite;
127 	extension_size	   = 0;
128 
129 	// prepare v5 extension map
130 	v5_extension_info.map		   = NULL;
131 	v5_extension_info.next		   = NULL;
132 	v5_extension_info.offset_cache = NULL;
133 	v5_extension_info.ref_count	   = 0;
134 
135 	// default map - 0 extensions
136 	map_size 		 = sizeof(extension_map_t);
137 	i=0;
138 	while ( (id = v5_full_mapp[i]) != 0  ) {
139 		if ( extension_descriptor[id].enabled ) {
140 			extension_size += extension_descriptor[id].size;
141 			map_size += sizeof(uint16_t);
142 		}
143 		i++;
144 	}
145 	// extension_size contains the sum of all optional extensions
146 	// caculate the record size without counters!
147 	v5_output_record_base_size = COMMON_RECORD_DATA_SIZE + 8 + extension_size;  // + 8 for 2 x IPv4 addr
148 
149 	// align 32 bits
150 	if ( ( map_size & 0x3 ) != 0 )
151 		map_size += 2;
152 
153 	// Create a v5 extension map
154 	v5_extension_info.map = (extension_map_t *)malloc((size_t)map_size);
155 	if ( !v5_extension_info.map ) {
156 		LogError("Process_v5: malloc() error in %s line %d: %s\n", __FILE__, __LINE__, strerror (errno));
157 		return 0;
158 	}
159 	v5_extension_info.map->type 	  	  = ExtensionMapType;
160 	v5_extension_info.map->size 	  	  = map_size;
161 	v5_extension_info.map->map_id 	  	  = INIT_ID;
162 	v5_extension_info.map->extension_size = extension_size;
163 
164 	// see netflow_v5_v7.h for extension map description
165 	map_index = 0;
166 	i=0;
167 	while ( (id = v5_full_mapp[i]) != 0 ) {
168 		if ( extension_descriptor[id].enabled )
169 			v5_extension_info.map->ex_id[map_index++] = id;
170 		i++;
171 	}
172 	v5_extension_info.map->ex_id[map_index] = 0;
173 
174 	return 1;
175 } // End of Init_v5_input
176 
177 /*
178  * functions used for receiving netflow v5 records
179  */
180 
181 
GetExporter(FlowSource_t * fs,netflow_v5_header_t * header)182 static inline exporter_v5_t *GetExporter(FlowSource_t *fs, netflow_v5_header_t *header) {
183 exporter_v5_t **e = (exporter_v5_t **)&(fs->exporter_data);
184 sampler_t *sampler;
185 uint16_t	engine_tag = ntohs(header->engine_tag);
186 uint16_t	version    = ntohs(header->version);
187 #define IP_STRING_LEN   40
188 char ipstr[IP_STRING_LEN];
189 
190 	// search the appropriate exporter engine
191 	while ( *e ) {
192 		if ( (*e)->info.version == version && (*e)->info.id == engine_tag &&
193 			 (*e)->info.ip.V6[0] == fs->ip.V6[0] && (*e)->info.ip.V6[1] == fs->ip.V6[1])
194 			return *e;
195 		e = &((*e)->next);
196 	}
197 
198 	// nothing found
199 	*e = (exporter_v5_t *)malloc(sizeof(exporter_v5_t));
200 	if ( !(*e)) {
201 		LogError("Process_v5: malloc() error in %s line %d: %s\n", __FILE__, __LINE__, strerror (errno));
202 		return NULL;
203 	}
204 	memset((void *)(*e), 0, sizeof(exporter_v5_t));
205 	(*e)->next	 			= NULL;
206 	(*e)->info.header.type  = ExporterInfoRecordType;
207 	(*e)->info.header.size  = sizeof(exporter_info_record_t);
208 	(*e)->info.version 		= version;
209 	(*e)->info.id			= engine_tag;
210 	(*e)->info.ip			= fs->ip;
211 	(*e)->info.sa_family	= fs->sa_family;
212 	(*e)->sequence_failure	= 0;
213 	(*e)->padding_errors	= 0;
214 	(*e)->packets			= 0;
215 	(*e)->flows				= 0;
216 	(*e)->first	 			= 1;
217 
218 	sampler = (sampler_t *)malloc(sizeof(sampler_t));
219 	if ( !sampler ) {
220 		LogError("Process_v5: malloc() error in %s line %d: %s\n", __FILE__, __LINE__, strerror (errno));
221 		return NULL;
222 	}
223 	(*e)->sampler = sampler;
224 
225 	sampler->info.header.type 	= SamplerInfoRecordype;
226 	sampler->info.header.size	= sizeof(sampler_info_record_t);
227 	sampler->info.id			= -1;
228 	sampler->info.mode			= (0xC000 & ntohs(header->sampling_interval)) >> 14;
229 	sampler->info.interval		= 0x3fff & ntohs(header->sampling_interval);
230 	sampler->next				= NULL;
231 
232 	// default is global default_sampling ( user defined or unsampled => 1 )
233 	if ( sampler->info.interval == 0 )
234 		sampler->info.interval = default_sampling;
235 
236 	// copy the v5 extension map
237 	(*e)->extension_map		= (extension_map_t *)malloc(v5_extension_info.map->size);
238 	if ( !(*e)->extension_map ) {
239 		LogError("Process_v5: malloc() error in %s line %d: %s\n", __FILE__, __LINE__, strerror (errno));
240 		free(*e);
241 		*e = NULL;
242 		return NULL;
243 	}
244 	memcpy((void *)(*e)->extension_map, (void *)v5_extension_info.map, v5_extension_info.map->size);
245 
246 	if ( !AddExtensionMap(fs, (*e)->extension_map) ) {
247 		// bad - we must free this map and fail - otherwise data can not be read any more
248 		free((*e)->extension_map);
249 		free(*e);
250 		*e = NULL;
251 		return NULL;
252 	}
253 
254 	(*e)->info.sysid = 0;
255 	FlushInfoExporter(fs, &((*e)->info));
256 	sampler->info.exporter_sysid		= (*e)->info.sysid;
257 	FlushInfoSampler(fs, &(sampler->info));
258 
259 	if ( fs->sa_family == AF_INET ) {
260 		uint32_t _ip = htonl(fs->ip.V4);
261 		inet_ntop(AF_INET, &_ip, ipstr, sizeof(ipstr));
262 	} else if ( fs->sa_family == AF_INET6 ) {
263 		uint64_t _ip[2];
264 		_ip[0] = htonll(fs->ip.V6[0]);
265 		_ip[1] = htonll(fs->ip.V6[1]);
266 		inet_ntop(AF_INET6, &_ip, ipstr, sizeof(ipstr));
267 	} else {
268 		strncpy(ipstr, "<unknown>", IP_STRING_LEN);
269 	}
270 
271 
272 
273 	dbg_printf("New Exporter: v5 SysID: %u, Extension ID: %i, IP: %s, Sampling Mode: %i, Sampling Interval: %u\n",
274 		(*e)->info.sysid, (*e)->extension_map->map_id, ipstr, sampler->info.mode	,sampler->info.interval);
275 	LogInfo("Process_v5: New exporter: SysID: %u, engine id %u, type %u, IP: %s, Sampling Mode: %i, Sampling Interval: %u\n",
276 		(*e)->info.sysid, ( engine_tag & 0xFF ),( (engine_tag >> 8) & 0xFF ), ipstr, sampler->info.mode	,sampler->info.interval );
277 
278 	if ( overwrite_sampling > 0 )  {
279 		sampler->info.interval = overwrite_sampling;
280 		LogInfo("Process_v5: Hard overwrite sampling rate: %u\n", sampler->info.interval);
281 	}
282 
283 	return (*e);
284 
285 } // End of GetExporter
286 
Process_v5_v7(void * in_buff,ssize_t in_buff_cnt,FlowSource_t * fs)287 void Process_v5_v7(void *in_buff, ssize_t in_buff_cnt, FlowSource_t *fs) {
288 netflow_v5_header_t	*v5_header;
289 netflow_v5_record_t *v5_record;
290 exporter_v5_t 		*exporter;
291 extension_map_t		*extension_map;
292 common_record_t		*common_record;
293 uint64_t	start_time, end_time, boot_time;
294 uint32_t   	First, Last;
295 uint16_t	count;
296 uint8_t		flags;
297 int			i, done, version, flow_record_length;
298 ssize_t		size_left;
299 char		*string;
300 
301 		/*
302 		 * v7 is treated as v5. It differes only in the record length, for what we process.
303 		 */
304 
305 		// map v5 data structure to input buffer
306 		v5_header 	= (netflow_v5_header_t *)in_buff;
307 
308 		exporter = GetExporter(fs, v5_header);
309 		if ( !exporter ) {
310 			LogError("Process_v5: Exporter NULL: Abort v5/v7 record processing");
311 			return;
312 		}
313 		exporter->packets++;
314 
315 		// calculate record size depending on counter size
316 		// sigh .. one day I should fix switch to 64bits
317 		if ( exporter->sampler->info.interval == 1 ) {
318 			flags = 0;
319 			v5_output_record_size = v5_output_record_base_size + 8; // 2 x  4 byte counters
320 		} else {
321 			flags = 0;
322 			SetFlag(flags, FLAG_SAMPLED);
323 			SetFlag(flags, FLAG_PKG_64);
324 			SetFlag(flags, FLAG_BYTES_64);
325 			v5_output_record_size = v5_output_record_base_size + 16; // 2 x  8 byte counters
326 		}
327 
328 		extension_map = exporter->extension_map;
329 
330 		version = ntohs(v5_header->version);
331 		flow_record_length = version == 5 ? NETFLOW_V5_RECORD_LENGTH : NETFLOW_V7_RECORD_LENGTH;
332 
333 		// this many data to process
334 		size_left	= in_buff_cnt;
335 
336 		common_record = fs->nffile->buff_ptr;
337 		done = 0;
338 		while ( !done ) {
339 			ipv4_block_t	*ipv4_block;
340 
341 			/* Process header */
342 
343 			// count check
344 	  		count	= ntohs(v5_header->count);
345 			if ( count > NETFLOW_V5_MAX_RECORDS ) {
346 				LogError("Process_v5: Unexpected record count in header: %i. Abort v5/v7 record processing", count);
347 				fs->nffile->buff_ptr = (void *)common_record;
348 				return;
349 			}
350 
351 			// input buffer size check for all expected records
352 			if ( size_left < ( NETFLOW_V5_HEADER_LENGTH + count * flow_record_length) ) {
353 				LogError("Process_v5: Not enough data to process v5 record. Abort v5/v7 record processing");
354 				fs->nffile->buff_ptr = (void *)common_record;
355 				return;
356 			}
357 
358 			// output buffer size check for all expected records
359 			if ( !CheckBufferSpace(fs->nffile, count * v5_output_record_size) ) {
360 				// fishy! - should never happen. maybe disk full?
361 				LogError("Process_v5: output buffer size error. Abort v5/v7 record processing");
362 				return;
363 			}
364 
365 			// map output record to memory buffer
366 			common_record	= (common_record_t *)fs->nffile->buff_ptr;
367 			ipv4_block		= (ipv4_block_t *)common_record->data;
368 
369 			// sequence check
370 			if ( exporter->first ) {
371 				exporter->last_sequence = ntohl(v5_header->flow_sequence);
372 				exporter->sequence 	  	= exporter->last_sequence;
373 				exporter->first 	  	= 0;
374 			} else {
375 				exporter->last_sequence = exporter->sequence;
376 				exporter->sequence 	  = ntohl(v5_header->flow_sequence);
377 				exporter->distance 	  = exporter->sequence - exporter->last_sequence;
378 				// handle overflow
379 				if (exporter->distance < 0) {
380 					exporter->distance = 0xffffffff + exporter->distance  +1;
381 				}
382 				if (exporter->distance != exporter->last_count) {
383 #define delta(a,b) ( (a)>(b) ? (a)-(b) : (b)-(a) )
384 					fs->nffile->stat_record->sequence_failure++;
385 					exporter->sequence_failure++;
386 					/*
387 					LogError("Flow v%d sequence last:%llu now:%llu mismatch. Missing: dist:%lu flows",
388 						version, exporter->last_sequence, exporter->sequence, exporter->distance);
389 					*/
390 
391 				}
392 			}
393 			exporter->last_count  = count;
394 
395 	  		v5_header->SysUptime	 = ntohl(v5_header->SysUptime);
396 	  		v5_header->unix_secs	 = ntohl(v5_header->unix_secs);
397 	  		v5_header->unix_nsecs	 = ntohl(v5_header->unix_nsecs);
398 
399 			/* calculate boot time in msec */
400 			boot_time  = ((uint64_t)(v5_header->unix_secs)*1000 +
401 					((uint64_t)(v5_header->unix_nsecs) / 1000000) ) - (uint64_t)(v5_header->SysUptime);
402 
403 			// process all records
404 			v5_record	= (netflow_v5_record_t *)((pointer_addr_t)v5_header + NETFLOW_V5_HEADER_LENGTH);
405 
406 			/* loop over each records associated with this header */
407 			for (i = 0; i < count; i++) {
408 				pointer_addr_t	bsize;
409 				uint64_t	packets, bytes;
410 				void	*data_ptr;
411 				uint8_t *s1, *s2;
412 				int j, id;
413 				// header data
414 	  			common_record->flags		  = flags;
415 	  			common_record->type			  = CommonRecordType;
416 	  			common_record->exporter_sysid = exporter->info.sysid;;
417 	  			common_record->ext_map		  = extension_map->map_id;
418 	  			common_record->size			  = v5_output_record_size;
419 
420 				// v5 common fields
421 	  			common_record->srcport		  = ntohs(v5_record->srcport);
422 	  			common_record->dstport		  = ntohs(v5_record->dstport);
423 	  			common_record->tcp_flags	  = v5_record->tcp_flags;
424 	  			common_record->prot			  = v5_record->prot;
425 	  			common_record->tos			  = v5_record->tos;
426 	  			common_record->fwd_status 	  = 0;
427 	  			common_record->reserved 	  = 0;
428 
429 				// v5 typed data as fixed struct v5_block
430 	  			ipv4_block->srcaddr	= ntohl(v5_record->srcaddr);
431 	  			ipv4_block->dstaddr	= ntohl(v5_record->dstaddr);
432 
433 				if ( exporter->sampler->info.interval == 1 ) {
434 					value32_t   *v = (value32_t *)ipv4_block->data;
435 
436 	  				packets  	= (uint64_t)ntohl(v5_record->dPkts);
437 	  				bytes		= (uint64_t)ntohl(v5_record->dOctets);
438 
439 					v->val		= packets;
440 					v 			= (value32_t *)v->data;
441 					v->val		= bytes;
442 
443 					data_ptr = (void *)v->data;
444 				} else {
445 					value64_t   *v = (value64_t *)ipv4_block->data;
446 					uint32_t    *ptr = (uint32_t *)&packets;
447 
448 	  				packets  	= (uint64_t)ntohl(v5_record->dPkts)   * (uint64_t)exporter->sampler->info.interval;
449 	  				bytes		= (uint64_t)ntohl(v5_record->dOctets) * (uint64_t)exporter->sampler->info.interval;
450 
451 					// pack packets in 32bit chunks
452 					v->val.val32[0] = ptr[0];
453 					v->val.val32[1] = ptr[1];
454 
455 					// pack bytes in 32bit chunks
456 					v   = (value64_t *)v->data;
457 					ptr = (uint32_t *)&bytes;
458 					v->val.val32[0] = ptr[0];
459 					v->val.val32[1] = ptr[1];
460 
461 					data_ptr = (void *)v->data;
462 				}
463 
464 				// process optional extensions
465 				j = 0;
466 				while ( (id = extension_map->ex_id[j]) != 0 ) {
467 					switch (id) {
468 						case EX_IO_SNMP_2:	{	// 2 byte input/output interface index
469 							tpl_ext_4_t *tpl = (tpl_ext_4_t *)data_ptr;
470 	  						tpl->input  = ntohs(v5_record->input);
471 	  						tpl->output = ntohs(v5_record->output);
472 							data_ptr = (void *)tpl->data;
473 							} break;
474 						case EX_AS_2:	 {	// 2 byte src/dst AS number
475 							tpl_ext_6_t *tpl = (tpl_ext_6_t *)data_ptr;
476 	  						tpl->src_as	= ntohs(v5_record->src_as);
477 	  						tpl->dst_as	= ntohs(v5_record->dst_as);
478 							data_ptr = (void *)tpl->data;
479 							} break;
480 						case EX_MULIPLE:	 {	// dst tos, direction, src/dst mask
481 							tpl_ext_8_t *tpl = (tpl_ext_8_t *)data_ptr;
482 							tpl->dst_tos	= 0;
483 							tpl->dir		= 0;
484 							tpl->src_mask	= v5_record->src_mask;
485 							tpl->dst_mask	= v5_record->dst_mask;
486 							data_ptr = (void *)tpl->data;
487 							} break;
488 						case EX_NEXT_HOP_v4:	 {	// IPv4 next hop
489 							tpl_ext_9_t *tpl = (tpl_ext_9_t *)data_ptr;
490 							tpl->nexthop = ntohl(v5_record->nexthop);
491 							data_ptr = (void *)tpl->data;
492 							} break;
493 						case EX_ROUTER_IP_v4:	 {	// IPv4 router address
494 							tpl_ext_23_t *tpl = (tpl_ext_23_t *)data_ptr;
495 							tpl->router_ip = fs->ip.V4;
496 							data_ptr = (void *)tpl->data;
497 							ClearFlag(common_record->flags, FLAG_IPV6_EXP);
498 							} break;
499 						case EX_ROUTER_ID:	 {	// engine type, engine ID
500 							tpl_ext_25_t *tpl = (tpl_ext_25_t *)data_ptr;
501 							uint16_t	engine_tag = ntohs(v5_header->engine_tag);
502 							tpl->engine_type  = (engine_tag >> 8) & 0xFF;
503 							tpl->engine_id    = (engine_tag & 0xFF);
504 							data_ptr = (void *)tpl->data;
505 							} break;
506 						case EX_RECEIVED: {
507 							tpl_ext_27_t *tpl = (tpl_ext_27_t *)data_ptr;
508 							tpl->received  = (uint64_t)((uint64_t)fs->received.tv_sec * 1000LL) + (uint64_t)((uint64_t)fs->received.tv_usec / 1000LL);
509 							data_ptr = (void *)tpl->data;
510 							} break;
511 
512 						default:
513 							// this should never happen, as v5 has no other extensions
514 							LogError("Process_v5: Unexpected extension %i for v5 record. Skip extension", id);
515 					}
516 					j++;
517 				}
518 
519 				// Time issues
520 	  			First	 				= ntohl(v5_record->First);
521 	  			Last		 			= ntohl(v5_record->Last);
522 
523 #ifdef FIXTIMEBUG
524 				/*
525 				 * Some users report, that they see flows, which have duration time of about 40days
526 				 * which is almost the overflow value. Investigating this, it cannot be an overflow
527 				 * and the difference is always 15160 or 15176 msec too little for a classical
528 				 * overflow. Therefore assume this must be an exporter bug
529 				 */
530 				if ( First > Last && ( (First - Last)  < 20000) ) {
531 					uint32_t _t;
532 					LogError("Process_v5: Unexpected time swap: First 0x%llx smaller than boot time: 0x%llx", start_time, boot_time);
533 					_t= First;
534 					First = Last;
535 					Last = _t;
536 				}
537 #endif
538 				if ( First > Last ) {
539 					/* First in msec, in case of msec overflow, between start and end */
540 					start_time = boot_time - 0x100000000LL + (uint64_t)First;
541 				} else {
542 					start_time = boot_time + (uint64_t)First;
543 				}
544 
545 				/* end time in msecs */
546 				end_time = (uint64_t)Last + boot_time;
547 
548 				// if overflow happened after flow ended but before got exported
549 				// the additional check > 100000 is required due to a CISCO IOS bug
550 				// CSCei12353 - thanks to Bojan
551 				if ( Last > v5_header->SysUptime && (( Last - v5_header->SysUptime) > 100000)) {
552 					start_time  -= 0x100000000LL;
553 					end_time    -= 0x100000000LL;
554 				}
555 
556 				common_record->first 		= start_time/1000;
557 				common_record->msec_first	= start_time - common_record->first*1000;
558 
559 				common_record->last 		= end_time/1000;
560 				common_record->msec_last	= end_time - common_record->last*1000;
561 
562 				// update first_seen, last_seen
563 				if ( start_time < fs->first_seen )
564 					fs->first_seen = start_time;
565 				if ( end_time > fs->last_seen )
566 					fs->last_seen = end_time;
567 
568 
569 				// Update stats
570 				switch (common_record->prot) {
571 					case IPPROTO_ICMP:
572 						fs->nffile->stat_record->numflows_icmp++;
573 						fs->nffile->stat_record->numpackets_icmp += packets;
574 						fs->nffile->stat_record->numbytes_icmp   += bytes;
575 						// fix odd CISCO behaviour for ICMP port/type in src port
576 						if ( common_record->srcport != 0 ) {
577 							s1 = (uint8_t *)&(common_record->srcport);
578 							s2 = (uint8_t *)&(common_record->dstport);
579 							s2[0] = s1[1];
580 							s2[1] = s1[0];
581 							common_record->srcport = 0;
582 						}
583 						break;
584 					case IPPROTO_TCP:
585 						fs->nffile->stat_record->numflows_tcp++;
586 						fs->nffile->stat_record->numpackets_tcp += packets;
587 						fs->nffile->stat_record->numbytes_tcp   += bytes;
588 						break;
589 					case IPPROTO_UDP:
590 						fs->nffile->stat_record->numflows_udp++;
591 						fs->nffile->stat_record->numpackets_udp += packets;
592 						fs->nffile->stat_record->numbytes_udp   += bytes;
593 						break;
594 					default:
595 						fs->nffile->stat_record->numflows_other++;
596 						fs->nffile->stat_record->numpackets_other += packets;
597 						fs->nffile->stat_record->numbytes_other   += bytes;
598 				}
599 				exporter->flows++;
600 				fs->nffile->stat_record->numflows++;
601 				fs->nffile->stat_record->numpackets	+= packets;
602 				fs->nffile->stat_record->numbytes	+= bytes;
603 
604 				if ( verbose ) {
605 					master_record_t master_record;
606 					memset((void *)&master_record, 0, sizeof(master_record_t));
607 					ExpandRecord_v2((common_record_t *)common_record, &v5_extension_info, &(exporter->info), &master_record);
608 				 	flow_record_to_raw(&master_record, &string, 0);
609 					printf("%s\n", string);
610 				}
611 
612 				// advance to next input flow record
613 				v5_record		= (netflow_v5_record_t *)((pointer_addr_t)v5_record + flow_record_length);
614 
615 				if ( ((pointer_addr_t)data_ptr - (pointer_addr_t)common_record) != v5_output_record_size ) {
616 					printf("Panic size check: ptr diff: %llu, record size: %u\n",
617 						(unsigned long long)((pointer_addr_t)data_ptr - (pointer_addr_t)common_record), v5_output_record_size );
618 					abort();
619 				}
620 				// advance to next output record
621 				common_record	= (common_record_t *)data_ptr;
622 				ipv4_block		= (ipv4_block_t *)common_record->data;
623 
624 				// buffer size sanity check - should never happen, but check it anyway
625 				bsize = (pointer_addr_t)common_record - (pointer_addr_t)fs->nffile->block_header - sizeof(data_block_header_t);
626 				if ( bsize >= BUFFSIZE ) {
627 					LogError("### Software error ###: %s line %d", __FILE__, __LINE__);
628 					LogError("Process_v5: Output buffer overflow! Flush buffer and skip records.");
629 					LogError("Buffer size: size: %u, bsize: %llu > %u", fs->nffile->block_header->size, (unsigned long long)bsize, BUFFSIZE);
630 					// reset buffer
631 					fs->nffile->block_header->size 		= 0;
632 					fs->nffile->block_header->NumRecords = 0;
633 					fs->nffile->buff_ptr = (void *)((pointer_addr_t)fs->nffile->block_header + sizeof(data_block_header_t) );
634 					return;
635 				}
636 
637 			} // End of foreach v5 record
638 
639 		// update file record size ( -> output buffer size )
640 		fs->nffile->block_header->NumRecords	+= count;
641 		fs->nffile->block_header->size 		+= count * v5_output_record_size;
642 		fs->nffile->buff_ptr 					= (void *)common_record;
643 
644 		// still to go for this many input bytes
645 		size_left 	-= NETFLOW_V5_HEADER_LENGTH + count * flow_record_length;
646 
647 		// next header
648 		v5_header	= (netflow_v5_header_t *)v5_record;
649 
650 		// should never be < 0
651 		done = size_left <= 0;
652 
653 	} // End of while !done
654 
655 	return;
656 
657 } /* End of Process_v5 */
658 
659 /*
660  * functions used for sending netflow v5 records
661  */
Init_v5_v7_output(send_peer_t * peer)662 void Init_v5_v7_output(send_peer_t *peer) {
663 
664 	v5_output_header = (netflow_v5_header_t *)peer->send_buffer;
665 	v5_output_header->version 		= htons(5);
666 	v5_output_header->SysUptime		= 0;
667 	v5_output_header->unix_secs		= 0;
668 	v5_output_header->unix_nsecs	= 0;
669 	v5_output_header->count 		= 0;
670 	output_engine.first				= 1;
671 
672 	output_engine.sequence		   = 0;
673 	output_engine.last_sequence	   = 0;
674 	output_engine.last_count 	   = 0;
675 	output_engine.sequence_failure = 0;
676 	v5_output_record = (netflow_v5_record_t *)((pointer_addr_t)v5_output_header + (pointer_addr_t)sizeof(netflow_v5_header_t));
677 
678 } // End of Init_v5_v7_output
679 
Add_v5_output_record(master_record_t * master_record,send_peer_t * peer)680 int Add_v5_output_record(master_record_t *master_record, send_peer_t *peer) {
681 static uint64_t	boot_time;	// in msec
682 static int	cnt;
683 extension_map_t *extension_map = master_record->map_ref;
684 uint32_t	i, id, t1, t2;
685 
686 	// Skip IPv6 records
687 	if ( (master_record->flags & FLAG_IPV6_ADDR ) != 0 )
688 		return 0;
689 
690 	if ( output_engine.first ) {	// first time a record is added
691 		// boot time is set one day back - assuming that the start time of every flow does not start ealier
692 		boot_time  			 		= (uint64_t)(master_record->first - 86400)*1000;
693 		v5_output_header->unix_secs = htonl(master_record->first - 86400);
694 		cnt   	 = 0;
695 		output_engine.first 	 = 0;
696 	}
697 	if ( cnt == 0 ) {
698 		peer->buff_ptr  = (void *)((pointer_addr_t)peer->send_buffer + NETFLOW_V5_HEADER_LENGTH);
699 		v5_output_record = (netflow_v5_record_t *)((pointer_addr_t)v5_output_header + (pointer_addr_t)sizeof(netflow_v5_header_t));
700 		output_engine.sequence = output_engine.last_sequence + output_engine.last_count;
701 		v5_output_header->flow_sequence	= htonl(output_engine.sequence);
702 		output_engine.last_sequence = output_engine.sequence;
703 	}
704 
705 	t1 	= (uint32_t)(1000LL * (uint64_t)master_record->first + (uint64_t)master_record->msec_first - boot_time);
706 	t2	= (uint32_t)(1000LL * (uint64_t)master_record->last  + (uint64_t)master_record->msec_last - boot_time);
707   	v5_output_record->First		= htonl(t1);
708   	v5_output_record->Last		= htonl(t2);
709 
710 	v5_output_record->srcaddr	= htonl(master_record->V4.srcaddr);
711   	v5_output_record->dstaddr	= htonl(master_record->V4.dstaddr);
712 
713   	v5_output_record->srcport	= htons(master_record->srcport);
714   	v5_output_record->dstport	= htons(master_record->dstport);
715   	v5_output_record->tcp_flags = master_record->tcp_flags;
716   	v5_output_record->prot		= master_record->prot;
717   	v5_output_record->tos		= master_record->tos;
718 
719 	// the 64bit counters are cut down to 32 bits for v5
720   	v5_output_record->dPkts		= htonl((uint32_t)master_record->dPkts);
721   	v5_output_record->dOctets	= htonl((uint32_t)master_record->dOctets);
722 
723   	v5_output_record->input		= htons(master_record->input);
724   	v5_output_record->output	= htons(master_record->output);
725   	v5_output_record->src_as	= htons(master_record->srcas);
726   	v5_output_record->dst_as	= htons(master_record->dstas);
727 	v5_output_record->src_mask 	= 0;
728 	v5_output_record->dst_mask 	= 0;
729 	v5_output_record->pad1 		= 0;
730 	v5_output_record->pad2 		= 0;
731   	v5_output_record->nexthop	= 0;
732 
733 	i = 0;
734 	while ( (id = extension_map->ex_id[i]) != 0 ) {
735 		switch (id) {
736 			case EX_IO_SNMP_2:
737   				v5_output_record->input		= htons(master_record->input);
738   				v5_output_record->output	= htons(master_record->output);
739 				break;
740 			case EX_AS_2:
741   				v5_output_record->src_as	= htons(master_record->srcas);
742   				v5_output_record->dst_as	= htons(master_record->dstas);
743 				break;
744 			case EX_MULIPLE:
745 				v5_output_record->src_mask 	= master_record->src_mask;
746 				v5_output_record->dst_mask 	= master_record->dst_mask;
747 				break;
748 			case EX_NEXT_HOP_v4:
749 				v5_output_record->nexthop	= htonl(master_record->ip_nexthop.V4);
750 				break;
751 			// default: Other extensions can not be sent with v5
752 		}
753 		i++;
754 	}
755 	cnt++;
756 
757 	v5_output_header->count 	= htons(cnt);
758 	peer->buff_ptr = (void *)((pointer_addr_t)peer->buff_ptr + NETFLOW_V5_RECORD_LENGTH);
759 	v5_output_record++;
760 	if ( cnt == NETFLOW_V5_MAX_RECORDS ) {
761 		peer->flush = 1;
762 		output_engine.last_count 	  = cnt;
763 		cnt = 0;
764 	}
765 
766 	return 0;
767 
768 } // End of Add_v5_output_record
769