1 /*
2 * Copyright (c) 2009-2020, Peter Haag
3 * Copyright (c) 2004-2008, SWITCH - Teleinformatikdienste fuer Lehre und Forschung
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright notice,
12 * this list of conditions and the following disclaimer in the documentation
13 * and/or other materials provided with the distribution.
14 * * Neither the name of the author nor the names of its contributors may be
15 * used to endorse or promote products derived from this software without
16 * specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 *
30 */
31
32 #include "config.h"
33
34 #include <sys/types.h>
35 #include <sys/uio.h>
36 #include <unistd.h>
37 #include <string.h>
38 #include <fcntl.h>
39 #include <sys/stat.h>
40 #include <sys/param.h>
41 #include <netinet/in.h>
42 #include <stdio.h>
43 #include <errno.h>
44 #include <unistd.h>
45 #include <stdlib.h>
46
47 #ifdef HAVE_STDINT_H
48 #include <stdint.h>
49 #endif
50
51 #include "util.h"
52 #include "nfdump.h"
53 #include "nffile.h"
54 #include "nfx.h"
55 #include "nfstat.h"
56 #include "nflowcache.h"
57 #include "exporter.h"
58 #include "output_util.h"
59 #include "nfexport.h"
60
61 #include "nfdump_inline.c"
62
63 #define NEED_PACKRECORD 1
64 #include "nffile_inline.c"
65 #undef NEED_PACKRECORD
66
67 #include "heapsort_inline.c"
68 #include "applybits_inline.c"
69
70 /* global vars */
71 extern extension_descriptor_t extension_descriptor[];
72
73 /* local vars */
74 enum CntIndices { FLOWS = 0, INPACKETS, INBYTES, OUTPACKETS, OUTBYTES };
75
76 static void ExportExtensionMaps( int aggregate, int bidir, nffile_t *nffile, extension_map_list_t *extension_map_list );
77
ExportExtensionMaps(int aggregate,int bidir,nffile_t * nffile,extension_map_list_t * extension_map_list)78 static void ExportExtensionMaps( int aggregate, int bidir, nffile_t *nffile, extension_map_list_t *extension_map_list ) {
79 int map_id, opt_extensions, num_extensions, new_map_size, opt_align;
80 extension_map_t *new_map;
81
82 // no extension maps to export - nothing to do
83 if ( extension_map_list->map_list == NULL )
84 return;
85
86 new_map = NULL;
87
88 for ( map_id = 0; map_id <= extension_map_list->max_used; map_id++ ) {
89 extension_map_t *SourceMap = extension_map_list->slot[map_id]->map;
90 int i, has_aggr_flows, has_out_bytes, has_out_packets, has_nat;
91 // skip maps, never referenced
92
93 #ifdef DEVEL
94 printf("Process map id: %i\n", map_id);
95 printf("Ref count: %i\n", extension_map_list->slot[map_id]->ref_count);
96 #endif
97
98 if ( extension_map_list->slot[map_id]->ref_count == 0 ) {
99 #ifdef DEVEL
100 printf("Ref count = 0 => Skip map\n");
101 #endif
102 continue;
103 }
104
105 // parse Source map if it contains all required fields:
106 // for aggregation EX_AGGR_FLOWS_4 or _8 is required
107 // for bidir flows EX_OUT_PKG_4 or _8 and EX_OUT_BYTES_4 or_8 are required
108 has_aggr_flows = 0;
109 has_out_bytes = 0;
110 has_out_packets = 0;
111 // parse map for older NEL nat extension
112 has_nat = 0;
113
114 int needConvert = 0;
115 num_extensions = 0;
116 i = 0;
117 while ( SourceMap->ex_id[i] ) {
118 switch (SourceMap->ex_id[i]) {
119 case EX_AGGR_FLOWS_4:
120 needConvert = 1;
121 case EX_AGGR_FLOWS_8:
122 has_aggr_flows = 1;
123 break;
124 case EX_OUT_BYTES_4:
125 needConvert = 1;
126 case EX_OUT_BYTES_8:
127 has_out_bytes = 1;
128 break;
129 case EX_OUT_PKG_4:
130 needConvert = 1;
131 case EX_OUT_PKG_8:
132 has_out_packets = 1;
133 break;
134 case EX_NEL_GLOBAL_IP_v4:
135 // Map old nat extension to common NSEL extension
136 SourceMap->ex_id[i] = EX_NSEL_XLATE_IP_v4;
137 has_nat = 1;
138 // default: nothing to do
139 }
140 i++;
141 num_extensions++;
142 }
143 #ifdef DEVEL
144 printf("map: num_extensions: %i, has_aggr_flows: %i, has_out_bytes: %i, has_out_packets: %i, has_nat: %i, needConvert: %i\n",
145 num_extensions, has_aggr_flows, has_out_bytes, has_out_packets, has_nat, needConvert);
146 #endif
147
148 // count missing extensions
149 opt_extensions = 0;
150 if ( aggregate && !has_aggr_flows )
151 opt_extensions++;
152
153 if ( bidir && !has_out_bytes )
154 opt_extensions++;
155
156 if ( bidir && !has_out_packets )
157 opt_extensions++;
158
159 opt_extensions += has_nat;
160 // calculate new map size
161 new_map_size = sizeof(extension_map_t) + ( num_extensions + opt_extensions) * sizeof(uint16_t);
162
163 #ifdef DEVEL
164 printf("opt_extensions: %i, new_map_size: %i\n", opt_extensions,new_map_size );
165 PrintExtensionMap(SourceMap);
166 #endif
167 if ( opt_extensions || needConvert ) {
168 // align 32bits
169 if (( new_map_size & 0x3 ) != 0 ) {
170 new_map_size += 4 - ( new_map_size & 0x3 );
171 opt_align = 1;
172 } else {
173 opt_align = 0;
174 }
175 } else {
176 // no missing elements in extension map - we can used the original one
177 // and we are done
178
179 #ifdef DEVEL
180 printf("New map identical => use this map:\n");
181 PrintExtensionMap(SourceMap);
182 #endif
183 // Flush the map to disk
184 AppendToBuffer(nffile, (void *)SourceMap, SourceMap->size);
185 continue;
186 }
187
188 #ifdef DEVEL
189 printf("Create new map:\n");
190 #endif
191 // new map is different - create the new map
192 new_map = (extension_map_t *)malloc((ssize_t)new_map_size);
193 if ( !new_map ) {
194 LogError("malloc() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno) );
195 exit(255);
196 }
197
198 // Panic check - should never happen, but we are going to copy memory
199 if ( new_map_size < SourceMap->size ) {
200 LogError("PANIC! new_map_size(%i) < SourceMap->size(%i) in %s line %d\n",
201 new_map_size, SourceMap->size, __FILE__, __LINE__);
202 exit(255);
203 }
204 // copy existing map
205 memcpy((void *)new_map, (void *)SourceMap, SourceMap->size);
206
207 new_map->size = new_map_size;
208
209 i = 0;
210 // convert 4 to 8 byte counters
211 while ( new_map->ex_id[i] ) {
212 switch (new_map->ex_id[i]) {
213 case EX_AGGR_FLOWS_4:
214 new_map->extension_size -= extension_descriptor[EX_AGGR_FLOWS_4].size;
215 new_map->extension_size += extension_descriptor[EX_AGGR_FLOWS_8].size;
216 new_map->ex_id[i] = EX_AGGR_FLOWS_8;
217 break;
218 case EX_OUT_BYTES_4:
219 new_map->extension_size -= extension_descriptor[EX_OUT_BYTES_4].size;
220 new_map->extension_size += extension_descriptor[EX_OUT_BYTES_8].size;
221 new_map->ex_id[i] = EX_OUT_BYTES_8;
222 break;
223 case EX_OUT_PKG_4:
224 new_map->extension_size -= extension_descriptor[EX_OUT_PKG_4].size;
225 new_map->extension_size += extension_descriptor[EX_OUT_PKG_8].size;
226 new_map->ex_id[i] = EX_OUT_PKG_8;
227 break;
228 // default: nothing to do
229 }
230 i++;
231 }
232
233 // add the missing extensions to the output map
234 if ( has_nat ) {
235 new_map->ex_id[i++] = EX_NSEL_XLATE_PORTS;
236 new_map->extension_size += extension_descriptor[EX_NSEL_XLATE_PORTS].size;
237 }
238 // add missing map elements
239 if ( aggregate && !has_aggr_flows ) {
240 new_map->ex_id[i++] = EX_AGGR_FLOWS_8;
241 new_map->extension_size += extension_descriptor[EX_AGGR_FLOWS_8].size;
242 }
243 if ( bidir && !has_out_bytes ) {
244 new_map->ex_id[i++] = EX_OUT_BYTES_8;
245 new_map->extension_size += extension_descriptor[EX_OUT_BYTES_8].size;
246 }
247 if ( bidir && !has_out_packets ) {
248 new_map->ex_id[i++] = EX_OUT_PKG_8;
249 new_map->extension_size += extension_descriptor[EX_OUT_PKG_8].size;
250 }
251 // end of map tag
252 new_map->ex_id[i++] = 0;
253 if ( opt_align )
254 new_map->ex_id[i] = 0;
255
256 #ifdef DEVEL
257 printf("New/converted extension map:\n");
258 PrintExtensionMap(new_map);
259 #endif
260
261 // set new export map
262 extension_map_list->slot[map_id]->exportMap = new_map;
263
264 // Flush the map to disk
265 AppendToBuffer(nffile, (void *)new_map, new_map->size);
266
267 }
268
269 } // End of ExportExtensionMaps
270
ExportFlowTable(nffile_t * nffile,int aggregate,int bidir,int GuessDir,int date_sorted,extension_map_list_t * extension_map_list)271 int ExportFlowTable(nffile_t *nffile, int aggregate, int bidir, int GuessDir, int date_sorted, extension_map_list_t *extension_map_list) {
272 hash_FlowTable *FlowTable;
273 FlowTableRecord_t *r;
274 SortElement_t *SortList;
275 master_record_t *aggr_record_mask;
276 uint32_t i;
277 uint32_t maxindex, c;
278 #ifdef DEVEL
279 char *string;
280 #endif
281
282 ExportExtensionMaps(aggregate, bidir, nffile, extension_map_list);
283 ExportExporterList(nffile);
284
285 aggr_record_mask = GetMasterAggregateMask();
286
287 FlowTable = GetFlowTable();
288 c = 0;
289 maxindex = FlowTable->NumRecords;
290 if ( date_sorted ) {
291 // Sort records according the date
292 SortList = (SortElement_t *)calloc(maxindex, sizeof(SortElement_t));
293
294 if ( !SortList ) {
295 LogError("malloc() error in %s line %d: %s\n", __FILE__, __LINE__, strerror (errno));
296 return 0;
297 }
298
299 // preset SortList table - still unsorted
300 for ( i=0; i<=FlowTable->IndexMask; i++ ) {
301 r = FlowTable->bucket[i];
302 if ( !r )
303 continue;
304
305 // foreach elem in this bucket
306 while ( r ) {
307 SortList[c].count = 1000LL * r->flowrecord.first + r->flowrecord.msec_first; // sort according the date
308 SortList[c].record = (void *)r;
309 c++;
310 r = r->next;
311 }
312 }
313
314 if ( c != maxindex ) {
315 LogError("Abort: Mismatch %s line %d: %s\n", __FILE__, __LINE__, strerror (errno));
316 return 0;
317 }
318
319 if ( c >= 2 )
320 heapSort(SortList, c, 0);
321
322 for ( i = 0; i < c; i++ ) {
323 master_record_t *flow_record;
324 common_record_t *raw_record;
325 extension_info_t *extension_info;
326
327 r = (FlowTableRecord_t *)(SortList[i].record);
328 raw_record = &(r->flowrecord);
329 extension_info = r->map_info_ref;
330
331 flow_record = &(extension_info->master_record);
332 ExpandRecord_v2( raw_record, extension_info, r->exp_ref, flow_record);
333 flow_record->dPkts = r->counter[INPACKETS];
334 flow_record->dOctets = r->counter[INBYTES];
335 flow_record->out_pkts = r->counter[OUTPACKETS];
336 flow_record->out_bytes = r->counter[OUTBYTES];
337 flow_record->aggr_flows = r->counter[FLOWS];
338
339 // apply IP mask from aggregation, to provide a pretty output
340 if ( FlowTable->has_masks ) {
341 flow_record->V6.srcaddr[0] &= FlowTable->IPmask[0];
342 flow_record->V6.srcaddr[1] &= FlowTable->IPmask[1];
343 flow_record->V6.dstaddr[0] &= FlowTable->IPmask[2];
344 flow_record->V6.dstaddr[1] &= FlowTable->IPmask[3];
345 }
346
347 if ( FlowTable->apply_netbits )
348 ApplyNetMaskBits(flow_record, FlowTable->apply_netbits);
349
350 if ( aggr_record_mask ) {
351 ApplyAggrMask(flow_record, aggr_record_mask);
352 }
353
354 if ( GuessDir &&
355 ( flow_record->prot == IPPROTO_TCP || flow_record->prot == IPPROTO_UDP) &&
356 ( flow_record->srcport < 1024 ) && ( flow_record->dstport > 1024 ) &&
357 ( flow_record->srcport < flow_record->dstport ) ) {
358 SwapFlow(flow_record);
359 }
360
361 // switch to output extension map
362 flow_record->map_ref = extension_info->exportMap ? extension_info->exportMap : extension_info->map;
363 flow_record->ext_map = flow_record->map_ref->map_id;
364 PackRecord(flow_record, nffile);
365 #ifdef DEVEL
366 flow_record_to_raw((void *)flow_record, &string, 0);
367 printf("%s\n", string);
368 #endif
369 // Update statistics
370 UpdateStat(nffile->stat_record, flow_record);
371 }
372
373 } else {
374 // print them as they came
375 for ( i=0; i<=FlowTable->IndexMask; i++ ) {
376 r = FlowTable->bucket[i];
377 while ( r ) {
378 master_record_t *flow_record;
379 common_record_t *raw_record;
380 extension_info_t *extension_info;
381
382 raw_record = &(r->flowrecord);
383 extension_info = r->map_info_ref;
384
385 flow_record = &(extension_info->master_record);
386 ExpandRecord_v2(raw_record, extension_info, r->exp_ref, flow_record);
387 flow_record->dPkts = r->counter[INPACKETS];
388 flow_record->dOctets = r->counter[INBYTES];
389 flow_record->out_pkts = r->counter[OUTPACKETS];
390 flow_record->out_bytes = r->counter[OUTBYTES];
391 flow_record->aggr_flows = r->counter[FLOWS];
392
393 // apply IP mask from aggregation, to provide a pretty output
394 if ( FlowTable->has_masks ) {
395 flow_record->V6.srcaddr[0] &= FlowTable->IPmask[0];
396 flow_record->V6.srcaddr[1] &= FlowTable->IPmask[1];
397 flow_record->V6.dstaddr[0] &= FlowTable->IPmask[2];
398 flow_record->V6.dstaddr[1] &= FlowTable->IPmask[3];
399 }
400
401 if ( FlowTable->apply_netbits )
402 ApplyNetMaskBits(flow_record, FlowTable->apply_netbits);
403
404 if ( aggr_record_mask ) {
405 ApplyAggrMask(flow_record, aggr_record_mask);
406 }
407
408 if ( GuessDir &&
409 ( flow_record->prot == IPPROTO_TCP || flow_record->prot == IPPROTO_UDP) &&
410 ( flow_record->srcport < 1024 ) && ( flow_record->dstport > 1024 ) &&
411 ( flow_record->srcport < flow_record->dstport ) ) {
412 SwapFlow(flow_record);
413 }
414
415 // switch to output extension map
416 flow_record->map_ref = extension_info->exportMap ? extension_info->exportMap : extension_info->map;
417 flow_record->ext_map = flow_record->map_ref->map_id;
418 PackRecord(flow_record, nffile);
419 #ifdef DEVEL
420 flow_record_to_raw((void *)flow_record, &string, 0);
421 printf("%s\n", string);
422 #endif
423 // Update statistics
424 UpdateStat(nffile->stat_record, flow_record);
425
426 r = r->next;
427 }
428 }
429
430 }
431
432 if ( nffile->block_header->NumRecords ) {
433 if ( WriteBlock(nffile) <= 0 ) {
434 LogError("Failed to write output buffer to disk: '%s'" , strerror(errno));
435 return 0;
436 }
437 }
438
439 return 1;
440
441 } // End of ExportFlowTable
442
443
444