1 /*
2  *  Copyright (c) 2009-2020, Peter Haag
3  *  Copyright (c) 2004-2008, SWITCH - Teleinformatikdienste fuer Lehre und Forschung
4  *  All rights reserved.
5  *
6  *  Redistribution and use in source and binary forms, with or without
7  *  modification, are permitted provided that the following conditions are met:
8  *
9  *   * Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *   * Redistributions in binary form must reproduce the above copyright notice,
12  *     this list of conditions and the following disclaimer in the documentation
13  *     and/or other materials provided with the distribution.
14  *   * Neither the name of the author nor the names of its contributors may be
15  *     used to endorse or promote products derived from this software without
16  *     specific prior written permission.
17  *
18  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  *  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  *  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  *  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22  *  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  *  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  *  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  *  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  *  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  *  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  *  POSSIBILITY OF SUCH DAMAGE.
29  *
30  */
31 
32 #include "config.h"
33 
34 #include <sys/types.h>
35 #include <sys/uio.h>
36 #include <unistd.h>
37 #include <string.h>
38 #include <fcntl.h>
39 #include <sys/stat.h>
40 #include <sys/param.h>
41 #include <netinet/in.h>
42 #include <stdio.h>
43 #include <errno.h>
44 #include <unistd.h>
45 #include <stdlib.h>
46 
47 #ifdef HAVE_STDINT_H
48 #include <stdint.h>
49 #endif
50 
51 #include "util.h"
52 #include "nfdump.h"
53 #include "nffile.h"
54 #include "nfx.h"
55 #include "nfstat.h"
56 #include "nflowcache.h"
57 #include "exporter.h"
58 #include "output_util.h"
59 #include "nfexport.h"
60 
61 #include "nfdump_inline.c"
62 
63 #define NEED_PACKRECORD 1
64 #include "nffile_inline.c"
65 #undef NEED_PACKRECORD
66 
67 #include "heapsort_inline.c"
68 #include "applybits_inline.c"
69 
70 /* global vars */
71 extern extension_descriptor_t extension_descriptor[];
72 
73 /* local vars */
74 enum CntIndices { FLOWS = 0, INPACKETS, INBYTES, OUTPACKETS, OUTBYTES };
75 
76 static void ExportExtensionMaps( int aggregate, int bidir, nffile_t *nffile, extension_map_list_t *extension_map_list );
77 
ExportExtensionMaps(int aggregate,int bidir,nffile_t * nffile,extension_map_list_t * extension_map_list)78 static void ExportExtensionMaps( int aggregate, int bidir, nffile_t *nffile, extension_map_list_t *extension_map_list ) {
79 int map_id, opt_extensions, num_extensions, new_map_size, opt_align;
80 extension_map_t	*new_map;
81 
82 	// no extension maps to export - nothing to do
83 	if ( extension_map_list->map_list == NULL )
84 		return;
85 
86 	new_map = NULL;
87 
88 	for ( map_id = 0; map_id <= extension_map_list->max_used; map_id++ ) {
89 		extension_map_t *SourceMap = extension_map_list->slot[map_id]->map;
90 		int i, has_aggr_flows, has_out_bytes, has_out_packets, has_nat;
91 		// skip maps, never referenced
92 
93 #ifdef DEVEL
94 		printf("Process map id: %i\n", map_id);
95 		printf("Ref count: %i\n", extension_map_list->slot[map_id]->ref_count);
96 #endif
97 
98 		if ( extension_map_list->slot[map_id]->ref_count == 0 ) {
99 #ifdef DEVEL
100 			printf("Ref count = 0 => Skip map\n");
101 #endif
102 			continue;
103 		}
104 
105 		// parse Source map if it contains all required fields:
106 		// for aggregation EX_AGGR_FLOWS_4 or _8 is required
107 		// for bidir flows EX_OUT_PKG_4 or _8 and EX_OUT_BYTES_4 or_8 are required
108 		has_aggr_flows  = 0;
109 		has_out_bytes	= 0;
110 		has_out_packets	= 0;
111 		// parse map for older NEL nat extension
112 		has_nat			= 0;
113 
114 		int needConvert = 0;
115 		num_extensions = 0;
116 		i = 0;
117 		while ( SourceMap->ex_id[i] ) {
118 			switch (SourceMap->ex_id[i]) {
119 				case EX_AGGR_FLOWS_4:
120 					needConvert = 1;
121 				case EX_AGGR_FLOWS_8:
122 					has_aggr_flows  = 1;
123 					break;
124 				case EX_OUT_BYTES_4:
125 					needConvert = 1;
126 				case EX_OUT_BYTES_8:
127 					has_out_bytes	= 1;
128 					break;
129 				case EX_OUT_PKG_4:
130 					needConvert = 1;
131 				case EX_OUT_PKG_8:
132 					has_out_packets	= 1;
133 					break;
134 				case EX_NEL_GLOBAL_IP_v4:
135 					// Map old nat extension to common NSEL extension
136 					SourceMap->ex_id[i] = EX_NSEL_XLATE_IP_v4;
137 					has_nat	= 1;
138 				// default: nothing to do
139 			}
140 			i++;
141 			num_extensions++;
142 		}
143 #ifdef DEVEL
144 		printf("map: num_extensions: %i, has_aggr_flows: %i, has_out_bytes: %i, has_out_packets: %i, has_nat: %i, needConvert: %i\n",
145 			num_extensions, has_aggr_flows, has_out_bytes, has_out_packets, has_nat, needConvert);
146 #endif
147 
148 		// count missing extensions
149 		opt_extensions = 0;
150 		if ( aggregate && !has_aggr_flows )
151 			opt_extensions++;
152 
153 		if ( bidir && !has_out_bytes )
154 			opt_extensions++;
155 
156 		if ( bidir && !has_out_packets )
157 			opt_extensions++;
158 
159 		opt_extensions += has_nat;
160 		// calculate new map size
161 		new_map_size = sizeof(extension_map_t) + ( num_extensions + opt_extensions) * sizeof(uint16_t);
162 
163 #ifdef DEVEL
164 		printf("opt_extensions: %i, new_map_size: %i\n", opt_extensions,new_map_size );
165 		PrintExtensionMap(SourceMap);
166 #endif
167 		if ( opt_extensions || needConvert ) {
168     		// align 32bits
169     		if (( new_map_size & 0x3 ) != 0 ) {
170         		new_map_size += 4 - ( new_map_size & 0x3 );
171 				opt_align = 1;
172     		} else {
173 				opt_align = 0;
174 			}
175 		} else {
176 			// no missing elements in extension map - we can used the original one
177 			// and we are done
178 
179 #ifdef DEVEL
180 			printf("New map identical => use this map:\n");
181 			PrintExtensionMap(SourceMap);
182 #endif
183 			// Flush the map to disk
184 			AppendToBuffer(nffile, (void *)SourceMap, SourceMap->size);
185 			continue;
186 		}
187 
188 #ifdef DEVEL
189 		printf("Create new map:\n");
190 #endif
191 		// new map is different - create the new map
192 		new_map = (extension_map_t *)malloc((ssize_t)new_map_size);
193 		if ( !new_map ) {
194 			LogError("malloc() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno) );
195 			exit(255);
196 		}
197 
198 		// Panic check - should never happen, but we are going to copy memory
199 		if ( new_map_size < SourceMap->size ) {
200 			LogError("PANIC! new_map_size(%i) < SourceMap->size(%i) in %s line %d\n",
201 				new_map_size, SourceMap->size,  __FILE__, __LINE__);
202 			exit(255);
203 		}
204 		// copy existing map
205 		memcpy((void *)new_map, (void *)SourceMap, SourceMap->size);
206 
207 		new_map->size   = new_map_size;
208 
209 		i = 0;
210 		// convert 4 to 8 byte counters
211 		while ( new_map->ex_id[i] ) {
212 			switch (new_map->ex_id[i]) {
213 				case EX_AGGR_FLOWS_4:
214 					new_map->extension_size -= extension_descriptor[EX_AGGR_FLOWS_4].size;
215 					new_map->extension_size += extension_descriptor[EX_AGGR_FLOWS_8].size;
216 					new_map->ex_id[i] = EX_AGGR_FLOWS_8;
217 					break;
218 				case EX_OUT_BYTES_4:
219 					new_map->extension_size -= extension_descriptor[EX_OUT_BYTES_4].size;
220 					new_map->extension_size += extension_descriptor[EX_OUT_BYTES_8].size;
221 					new_map->ex_id[i] = EX_OUT_BYTES_8;
222 					break;
223 				case EX_OUT_PKG_4:
224 					new_map->extension_size -= extension_descriptor[EX_OUT_PKG_4].size;
225 					new_map->extension_size += extension_descriptor[EX_OUT_PKG_8].size;
226 					new_map->ex_id[i] = EX_OUT_PKG_8;
227 					break;
228 				// default: nothing to do
229 			}
230 			i++;
231 		}
232 
233 		// add the missing extensions to the output map
234 		if ( has_nat ) {
235 			new_map->ex_id[i++] 	 = EX_NSEL_XLATE_PORTS;
236 			new_map->extension_size += extension_descriptor[EX_NSEL_XLATE_PORTS].size;
237 		}
238 		// add missing map elements
239 		if ( aggregate && !has_aggr_flows ) {
240 			new_map->ex_id[i++] 	 = EX_AGGR_FLOWS_8;
241 			new_map->extension_size += extension_descriptor[EX_AGGR_FLOWS_8].size;
242 		}
243 		if ( bidir && !has_out_bytes )  {
244 			new_map->ex_id[i++] 	 = EX_OUT_BYTES_8;
245 			new_map->extension_size += extension_descriptor[EX_OUT_BYTES_8].size;
246 		}
247 		if ( bidir && !has_out_packets )  {
248 			new_map->ex_id[i++] 	 = EX_OUT_PKG_8;
249 			new_map->extension_size += extension_descriptor[EX_OUT_PKG_8].size;
250 		}
251 		// end of map tag
252 		new_map->ex_id[i++]    = 0;
253 		if ( opt_align )
254 			new_map->ex_id[i]  = 0;
255 
256 #ifdef DEVEL
257 		printf("New/converted extension map:\n");
258 		PrintExtensionMap(new_map);
259 #endif
260 
261 		// set new export map
262 		extension_map_list->slot[map_id]->exportMap = new_map;
263 
264 		// Flush the map to disk
265 		AppendToBuffer(nffile, (void *)new_map, new_map->size);
266 
267 	}
268 
269 } // End of ExportExtensionMaps
270 
ExportFlowTable(nffile_t * nffile,int aggregate,int bidir,int GuessDir,int date_sorted,extension_map_list_t * extension_map_list)271 int ExportFlowTable(nffile_t *nffile, int aggregate, int bidir, int GuessDir, int date_sorted, extension_map_list_t *extension_map_list) {
272 hash_FlowTable *FlowTable;
273 FlowTableRecord_t	*r;
274 SortElement_t 		*SortList;
275 master_record_t		*aggr_record_mask;
276 uint32_t 			i;
277 uint32_t			maxindex, c;
278 #ifdef DEVEL
279 char				*string;
280 #endif
281 
282 	ExportExtensionMaps(aggregate, bidir, nffile, extension_map_list);
283 	ExportExporterList(nffile);
284 
285 	aggr_record_mask = GetMasterAggregateMask();
286 
287 	FlowTable = GetFlowTable();
288 	c = 0;
289 	maxindex = FlowTable->NumRecords;
290 	if ( date_sorted ) {
291 		// Sort records according the date
292 		SortList = (SortElement_t *)calloc(maxindex, sizeof(SortElement_t));
293 
294 		if ( !SortList ) {
295 			LogError("malloc() error in %s line %d: %s\n", __FILE__, __LINE__, strerror (errno));
296 			return 0;
297 		}
298 
299 		// preset SortList table - still unsorted
300 		for ( i=0; i<=FlowTable->IndexMask; i++ ) {
301 			r = FlowTable->bucket[i];
302 			if ( !r )
303 				continue;
304 
305 			// foreach elem in this bucket
306 			while ( r ) {
307 				SortList[c].count  = 1000LL * r->flowrecord.first + r->flowrecord.msec_first;	// sort according the date
308 				SortList[c].record = (void *)r;
309 				c++;
310 				r = r->next;
311 			}
312 		}
313 
314 		if ( c != maxindex ) {
315 			LogError("Abort: Mismatch %s line %d: %s\n", __FILE__, __LINE__, strerror (errno));
316 			return 0;
317 		}
318 
319 		if ( c >= 2 )
320  			heapSort(SortList, c, 0);
321 
322 		for ( i = 0; i < c; i++ ) {
323 			master_record_t	*flow_record;
324 			common_record_t *raw_record;
325 			extension_info_t *extension_info;
326 
327 			r = (FlowTableRecord_t *)(SortList[i].record);
328 			raw_record = &(r->flowrecord);
329 			extension_info = r->map_info_ref;
330 
331 			flow_record = &(extension_info->master_record);
332 			ExpandRecord_v2( raw_record, extension_info, r->exp_ref, flow_record);
333 			flow_record->dPkts 		= r->counter[INPACKETS];
334 			flow_record->dOctets 	= r->counter[INBYTES];
335 			flow_record->out_pkts 	= r->counter[OUTPACKETS];
336 			flow_record->out_bytes 	= r->counter[OUTBYTES];
337 			flow_record->aggr_flows = r->counter[FLOWS];
338 
339 			// apply IP mask from aggregation, to provide a pretty output
340 			if ( FlowTable->has_masks ) {
341 				flow_record->V6.srcaddr[0] &= FlowTable->IPmask[0];
342 				flow_record->V6.srcaddr[1] &= FlowTable->IPmask[1];
343 				flow_record->V6.dstaddr[0] &= FlowTable->IPmask[2];
344 				flow_record->V6.dstaddr[1] &= FlowTable->IPmask[3];
345 			}
346 
347 			if ( FlowTable->apply_netbits )
348 				ApplyNetMaskBits(flow_record, FlowTable->apply_netbits);
349 
350 			if ( aggr_record_mask ) {
351 				ApplyAggrMask(flow_record, aggr_record_mask);
352 			}
353 
354 			if ( GuessDir &&
355 			   ( flow_record->prot == IPPROTO_TCP || flow_record->prot == IPPROTO_UDP) &&
356 			   ( flow_record->srcport < 1024 ) && ( flow_record->dstport > 1024 ) &&
357 			   ( flow_record->srcport < flow_record->dstport ) ) {
358 				SwapFlow(flow_record);
359 			}
360 
361 			// switch to output extension map
362 			flow_record->map_ref = extension_info->exportMap ? extension_info->exportMap : extension_info->map;
363 			flow_record->ext_map = flow_record->map_ref->map_id;
364 			PackRecord(flow_record, nffile);
365 #ifdef DEVEL
366 			flow_record_to_raw((void *)flow_record, &string, 0);
367 			printf("%s\n", string);
368 #endif
369 			// Update statistics
370 			UpdateStat(nffile->stat_record, flow_record);
371 		}
372 
373 	} else {
374 		// print them as they came
375 		for ( i=0; i<=FlowTable->IndexMask; i++ ) {
376 			r = FlowTable->bucket[i];
377 			while ( r ) {
378 				master_record_t	*flow_record;
379 				common_record_t *raw_record;
380 				extension_info_t *extension_info;
381 
382 				raw_record = &(r->flowrecord);
383 				extension_info = r->map_info_ref;
384 
385 				flow_record = &(extension_info->master_record);
386 				ExpandRecord_v2(raw_record, extension_info, r->exp_ref, flow_record);
387 				flow_record->dPkts 		= r->counter[INPACKETS];
388 				flow_record->dOctets 	= r->counter[INBYTES];
389 				flow_record->out_pkts 	= r->counter[OUTPACKETS];
390 				flow_record->out_bytes 	= r->counter[OUTBYTES];
391 				flow_record->aggr_flows	= r->counter[FLOWS];
392 
393 				// apply IP mask from aggregation, to provide a pretty output
394 				if ( FlowTable->has_masks ) {
395 					flow_record->V6.srcaddr[0] &= FlowTable->IPmask[0];
396 					flow_record->V6.srcaddr[1] &= FlowTable->IPmask[1];
397 					flow_record->V6.dstaddr[0] &= FlowTable->IPmask[2];
398 					flow_record->V6.dstaddr[1] &= FlowTable->IPmask[3];
399 				}
400 
401 				if ( FlowTable->apply_netbits )
402 					ApplyNetMaskBits(flow_record, FlowTable->apply_netbits);
403 
404 				if ( aggr_record_mask ) {
405 					ApplyAggrMask(flow_record, aggr_record_mask);
406 				}
407 
408 				if ( GuessDir &&
409 			   	   ( flow_record->prot == IPPROTO_TCP || flow_record->prot == IPPROTO_UDP) &&
410 			   	   ( flow_record->srcport < 1024 ) && ( flow_record->dstport > 1024 ) &&
411 			   	   ( flow_record->srcport < flow_record->dstport ) ) {
412 					SwapFlow(flow_record);
413 				}
414 
415 				// switch to output extension map
416 				flow_record->map_ref = extension_info->exportMap ? extension_info->exportMap : extension_info->map;
417 				flow_record->ext_map = flow_record->map_ref->map_id;
418 				PackRecord(flow_record, nffile);
419 #ifdef DEVEL
420 				flow_record_to_raw((void *)flow_record, &string, 0);
421 				printf("%s\n", string);
422 #endif
423 				// Update statistics
424 				UpdateStat(nffile->stat_record, flow_record);
425 
426 				r = r->next;
427 			}
428 		}
429 
430 	}
431 
432     if ( nffile->block_header->NumRecords ) {
433         if ( WriteBlock(nffile) <= 0 ) {
434             LogError("Failed to write output buffer to disk: '%s'" , strerror(errno));
435 			return 0;
436         }
437     }
438 
439 	return 1;
440 
441 } // End of ExportFlowTable
442 
443 
444