1 /*
2 * Copyright (c) 2009-2020, Peter Haag
3 * Copyright (c) 2004-2008, SWITCH - Teleinformatikdienste fuer Lehre und Forschung
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright notice,
12 * this list of conditions and the following disclaimer in the documentation
13 * and/or other materials provided with the distribution.
14 * * Neither the name of the author nor the names of its contributors may be
15 * used to endorse or promote products derived from this software without
16 * specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 *
30 */
31
32 #ifdef HAVE_CONFIG_H
33 #include "config.h"
34 #endif
35
36 #include <stdio.h>
37 #include <sys/types.h>
38 #include <sys/socket.h>
39 #include <unistd.h>
40 #include <stdlib.h>
41 #include <netinet/in.h>
42 #include <arpa/inet.h>
43 #include <errno.h>
44 #include <string.h>
45
46 #ifdef HAVE_STDINT_H
47 #include <stdint.h>
48 #endif
49
50 #include "util.h"
51 #include "nfdump.h"
52 #include "nffile.h"
53 #include "nfx.h"
54 #include "nfnet.h"
55 #include "output_raw.h"
56 #include "bookkeeper.h"
57 #include "collector.h"
58 #include "exporter.h"
59 #include "netflow_v5_v7.h"
60
61 extern extension_descriptor_t extension_descriptor[];
62
63 /* module limited globals */
64 static int verbose;
65 static uint32_t default_sampling;
66 static uint32_t overwrite_sampling;
67
68 static extension_info_t v5_extension_info; // common for all v5 records
69 static uint16_t v5_output_record_size, v5_output_record_base_size;
70
71 // All required extension to save full v5 records
72 static uint16_t v5_full_mapp[] = { EX_IO_SNMP_2, EX_AS_2, EX_MULIPLE, EX_NEXT_HOP_v4, EX_ROUTER_IP_v4, EX_ROUTER_ID, EX_RECEIVED, 0 };
73
74 // to simplify, assume blocks with 64 bit counters to check for enough buffer space
75 // regardless if 32 or 64 bit packet/byte counters
76 #define V5_BLOCK_DATA_SIZE (sizeof(ipv4_block_t) - sizeof(uint32_t) + 2 * sizeof(uint64_t))
77
78 typedef struct exporter_v5_s {
79 // identical to exporter_t
80 struct exporter_v5_s *next;
81
82 // exporter information
83 exporter_info_record_t info;
84
85 uint64_t packets; // number of packets sent by this exporter
86 uint64_t flows; // number of flow records sent by this exporter
87 uint32_t sequence_failure; // number of sequence failues
88 uint32_t padding_errors; // number of sequence failues
89
90 // sampler
91 sampler_t *sampler;
92 // end of exporter_t
93
94 // sequence vars
95 int64_t last_sequence;
96 int64_t sequence, distance;
97 int64_t last_count;
98
99 int first;
100
101 // extension map
102 extension_map_t *extension_map;
103
104 } exporter_v5_t;
105
106 // for sending netflow v5
107 static netflow_v5_header_t *v5_output_header;
108 static netflow_v5_record_t *v5_output_record;
109 static exporter_v5_t output_engine;
110
111 static inline exporter_v5_t *GetExporter(FlowSource_t *fs, netflow_v5_header_t *header);
112
113 static inline int CheckBufferSpace(nffile_t *nffile, size_t required);
114
115 /* functions */
116
117 #include "nffile_inline.c"
118
Init_v5_v7_input(int v,uint32_t sampling,uint32_t overwrite)119 int Init_v5_v7_input(int v, uint32_t sampling, uint32_t overwrite) {
120 int i, id, map_index;
121 int extension_size;
122 uint16_t map_size;
123
124 verbose = v;
125 default_sampling = sampling;
126 overwrite_sampling = overwrite;
127 extension_size = 0;
128
129 // prepare v5 extension map
130 v5_extension_info.map = NULL;
131 v5_extension_info.next = NULL;
132 v5_extension_info.offset_cache = NULL;
133 v5_extension_info.ref_count = 0;
134
135 // default map - 0 extensions
136 map_size = sizeof(extension_map_t);
137 i=0;
138 while ( (id = v5_full_mapp[i]) != 0 ) {
139 if ( extension_descriptor[id].enabled ) {
140 extension_size += extension_descriptor[id].size;
141 map_size += sizeof(uint16_t);
142 }
143 i++;
144 }
145 // extension_size contains the sum of all optional extensions
146 // caculate the record size without counters!
147 v5_output_record_base_size = COMMON_RECORD_DATA_SIZE + 8 + extension_size; // + 8 for 2 x IPv4 addr
148
149 // align 32 bits
150 if ( ( map_size & 0x3 ) != 0 )
151 map_size += 2;
152
153 // Create a v5 extension map
154 v5_extension_info.map = (extension_map_t *)malloc((size_t)map_size);
155 if ( !v5_extension_info.map ) {
156 LogError("Process_v5: malloc() error in %s line %d: %s\n", __FILE__, __LINE__, strerror (errno));
157 return 0;
158 }
159 v5_extension_info.map->type = ExtensionMapType;
160 v5_extension_info.map->size = map_size;
161 v5_extension_info.map->map_id = INIT_ID;
162 v5_extension_info.map->extension_size = extension_size;
163
164 // see netflow_v5_v7.h for extension map description
165 map_index = 0;
166 i=0;
167 while ( (id = v5_full_mapp[i]) != 0 ) {
168 if ( extension_descriptor[id].enabled )
169 v5_extension_info.map->ex_id[map_index++] = id;
170 i++;
171 }
172 v5_extension_info.map->ex_id[map_index] = 0;
173
174 return 1;
175 } // End of Init_v5_input
176
177 /*
178 * functions used for receiving netflow v5 records
179 */
180
181
GetExporter(FlowSource_t * fs,netflow_v5_header_t * header)182 static inline exporter_v5_t *GetExporter(FlowSource_t *fs, netflow_v5_header_t *header) {
183 exporter_v5_t **e = (exporter_v5_t **)&(fs->exporter_data);
184 sampler_t *sampler;
185 uint16_t engine_tag = ntohs(header->engine_tag);
186 uint16_t version = ntohs(header->version);
187 #define IP_STRING_LEN 40
188 char ipstr[IP_STRING_LEN];
189
190 // search the appropriate exporter engine
191 while ( *e ) {
192 if ( (*e)->info.version == version && (*e)->info.id == engine_tag &&
193 (*e)->info.ip.V6[0] == fs->ip.V6[0] && (*e)->info.ip.V6[1] == fs->ip.V6[1])
194 return *e;
195 e = &((*e)->next);
196 }
197
198 // nothing found
199 *e = (exporter_v5_t *)malloc(sizeof(exporter_v5_t));
200 if ( !(*e)) {
201 LogError("Process_v5: malloc() error in %s line %d: %s\n", __FILE__, __LINE__, strerror (errno));
202 return NULL;
203 }
204 memset((void *)(*e), 0, sizeof(exporter_v5_t));
205 (*e)->next = NULL;
206 (*e)->info.header.type = ExporterInfoRecordType;
207 (*e)->info.header.size = sizeof(exporter_info_record_t);
208 (*e)->info.version = version;
209 (*e)->info.id = engine_tag;
210 (*e)->info.ip = fs->ip;
211 (*e)->info.sa_family = fs->sa_family;
212 (*e)->sequence_failure = 0;
213 (*e)->padding_errors = 0;
214 (*e)->packets = 0;
215 (*e)->flows = 0;
216 (*e)->first = 1;
217
218 sampler = (sampler_t *)malloc(sizeof(sampler_t));
219 if ( !sampler ) {
220 LogError("Process_v5: malloc() error in %s line %d: %s\n", __FILE__, __LINE__, strerror (errno));
221 return NULL;
222 }
223 (*e)->sampler = sampler;
224
225 sampler->info.header.type = SamplerInfoRecordype;
226 sampler->info.header.size = sizeof(sampler_info_record_t);
227 sampler->info.id = -1;
228 sampler->info.mode = (0xC000 & ntohs(header->sampling_interval)) >> 14;
229 sampler->info.interval = 0x3fff & ntohs(header->sampling_interval);
230 sampler->next = NULL;
231
232 // default is global default_sampling ( user defined or unsampled => 1 )
233 if ( sampler->info.interval == 0 )
234 sampler->info.interval = default_sampling;
235
236 // copy the v5 extension map
237 (*e)->extension_map = (extension_map_t *)malloc(v5_extension_info.map->size);
238 if ( !(*e)->extension_map ) {
239 LogError("Process_v5: malloc() error in %s line %d: %s\n", __FILE__, __LINE__, strerror (errno));
240 free(*e);
241 *e = NULL;
242 return NULL;
243 }
244 memcpy((void *)(*e)->extension_map, (void *)v5_extension_info.map, v5_extension_info.map->size);
245
246 if ( !AddExtensionMap(fs, (*e)->extension_map) ) {
247 // bad - we must free this map and fail - otherwise data can not be read any more
248 free((*e)->extension_map);
249 free(*e);
250 *e = NULL;
251 return NULL;
252 }
253
254 (*e)->info.sysid = 0;
255 FlushInfoExporter(fs, &((*e)->info));
256 sampler->info.exporter_sysid = (*e)->info.sysid;
257 FlushInfoSampler(fs, &(sampler->info));
258
259 if ( fs->sa_family == AF_INET ) {
260 uint32_t _ip = htonl(fs->ip.V4);
261 inet_ntop(AF_INET, &_ip, ipstr, sizeof(ipstr));
262 } else if ( fs->sa_family == AF_INET6 ) {
263 uint64_t _ip[2];
264 _ip[0] = htonll(fs->ip.V6[0]);
265 _ip[1] = htonll(fs->ip.V6[1]);
266 inet_ntop(AF_INET6, &_ip, ipstr, sizeof(ipstr));
267 } else {
268 strncpy(ipstr, "<unknown>", IP_STRING_LEN);
269 }
270
271
272
273 dbg_printf("New Exporter: v5 SysID: %u, Extension ID: %i, IP: %s, Sampling Mode: %i, Sampling Interval: %u\n",
274 (*e)->info.sysid, (*e)->extension_map->map_id, ipstr, sampler->info.mode ,sampler->info.interval);
275 LogInfo("Process_v5: New exporter: SysID: %u, engine id %u, type %u, IP: %s, Sampling Mode: %i, Sampling Interval: %u\n",
276 (*e)->info.sysid, ( engine_tag & 0xFF ),( (engine_tag >> 8) & 0xFF ), ipstr, sampler->info.mode ,sampler->info.interval );
277
278 if ( overwrite_sampling > 0 ) {
279 sampler->info.interval = overwrite_sampling;
280 LogInfo("Process_v5: Hard overwrite sampling rate: %u\n", sampler->info.interval);
281 }
282
283 return (*e);
284
285 } // End of GetExporter
286
Process_v5_v7(void * in_buff,ssize_t in_buff_cnt,FlowSource_t * fs)287 void Process_v5_v7(void *in_buff, ssize_t in_buff_cnt, FlowSource_t *fs) {
288 netflow_v5_header_t *v5_header;
289 netflow_v5_record_t *v5_record;
290 exporter_v5_t *exporter;
291 extension_map_t *extension_map;
292 common_record_t *common_record;
293 uint64_t start_time, end_time, boot_time;
294 uint32_t First, Last;
295 uint16_t count;
296 uint8_t flags;
297 int i, done, version, flow_record_length;
298 ssize_t size_left;
299 char *string;
300
301 /*
302 * v7 is treated as v5. It differes only in the record length, for what we process.
303 */
304
305 // map v5 data structure to input buffer
306 v5_header = (netflow_v5_header_t *)in_buff;
307
308 exporter = GetExporter(fs, v5_header);
309 if ( !exporter ) {
310 LogError("Process_v5: Exporter NULL: Abort v5/v7 record processing");
311 return;
312 }
313 exporter->packets++;
314
315 // calculate record size depending on counter size
316 // sigh .. one day I should fix switch to 64bits
317 if ( exporter->sampler->info.interval == 1 ) {
318 flags = 0;
319 v5_output_record_size = v5_output_record_base_size + 8; // 2 x 4 byte counters
320 } else {
321 flags = 0;
322 SetFlag(flags, FLAG_SAMPLED);
323 SetFlag(flags, FLAG_PKG_64);
324 SetFlag(flags, FLAG_BYTES_64);
325 v5_output_record_size = v5_output_record_base_size + 16; // 2 x 8 byte counters
326 }
327
328 extension_map = exporter->extension_map;
329
330 version = ntohs(v5_header->version);
331 flow_record_length = version == 5 ? NETFLOW_V5_RECORD_LENGTH : NETFLOW_V7_RECORD_LENGTH;
332
333 // this many data to process
334 size_left = in_buff_cnt;
335
336 common_record = fs->nffile->buff_ptr;
337 done = 0;
338 while ( !done ) {
339 ipv4_block_t *ipv4_block;
340
341 /* Process header */
342
343 // count check
344 count = ntohs(v5_header->count);
345 if ( count > NETFLOW_V5_MAX_RECORDS ) {
346 LogError("Process_v5: Unexpected record count in header: %i. Abort v5/v7 record processing", count);
347 fs->nffile->buff_ptr = (void *)common_record;
348 return;
349 }
350
351 // input buffer size check for all expected records
352 if ( size_left < ( NETFLOW_V5_HEADER_LENGTH + count * flow_record_length) ) {
353 LogError("Process_v5: Not enough data to process v5 record. Abort v5/v7 record processing");
354 fs->nffile->buff_ptr = (void *)common_record;
355 return;
356 }
357
358 // output buffer size check for all expected records
359 if ( !CheckBufferSpace(fs->nffile, count * v5_output_record_size) ) {
360 // fishy! - should never happen. maybe disk full?
361 LogError("Process_v5: output buffer size error. Abort v5/v7 record processing");
362 return;
363 }
364
365 // map output record to memory buffer
366 common_record = (common_record_t *)fs->nffile->buff_ptr;
367 ipv4_block = (ipv4_block_t *)common_record->data;
368
369 // sequence check
370 if ( exporter->first ) {
371 exporter->last_sequence = ntohl(v5_header->flow_sequence);
372 exporter->sequence = exporter->last_sequence;
373 exporter->first = 0;
374 } else {
375 exporter->last_sequence = exporter->sequence;
376 exporter->sequence = ntohl(v5_header->flow_sequence);
377 exporter->distance = exporter->sequence - exporter->last_sequence;
378 // handle overflow
379 if (exporter->distance < 0) {
380 exporter->distance = 0xffffffff + exporter->distance +1;
381 }
382 if (exporter->distance != exporter->last_count) {
383 #define delta(a,b) ( (a)>(b) ? (a)-(b) : (b)-(a) )
384 fs->nffile->stat_record->sequence_failure++;
385 exporter->sequence_failure++;
386 /*
387 LogError("Flow v%d sequence last:%llu now:%llu mismatch. Missing: dist:%lu flows",
388 version, exporter->last_sequence, exporter->sequence, exporter->distance);
389 */
390
391 }
392 }
393 exporter->last_count = count;
394
395 v5_header->SysUptime = ntohl(v5_header->SysUptime);
396 v5_header->unix_secs = ntohl(v5_header->unix_secs);
397 v5_header->unix_nsecs = ntohl(v5_header->unix_nsecs);
398
399 /* calculate boot time in msec */
400 boot_time = ((uint64_t)(v5_header->unix_secs)*1000 +
401 ((uint64_t)(v5_header->unix_nsecs) / 1000000) ) - (uint64_t)(v5_header->SysUptime);
402
403 // process all records
404 v5_record = (netflow_v5_record_t *)((pointer_addr_t)v5_header + NETFLOW_V5_HEADER_LENGTH);
405
406 /* loop over each records associated with this header */
407 for (i = 0; i < count; i++) {
408 pointer_addr_t bsize;
409 uint64_t packets, bytes;
410 void *data_ptr;
411 uint8_t *s1, *s2;
412 int j, id;
413 // header data
414 common_record->flags = flags;
415 common_record->type = CommonRecordType;
416 common_record->exporter_sysid = exporter->info.sysid;;
417 common_record->ext_map = extension_map->map_id;
418 common_record->size = v5_output_record_size;
419
420 // v5 common fields
421 common_record->srcport = ntohs(v5_record->srcport);
422 common_record->dstport = ntohs(v5_record->dstport);
423 common_record->tcp_flags = v5_record->tcp_flags;
424 common_record->prot = v5_record->prot;
425 common_record->tos = v5_record->tos;
426 common_record->fwd_status = 0;
427 common_record->reserved = 0;
428
429 // v5 typed data as fixed struct v5_block
430 ipv4_block->srcaddr = ntohl(v5_record->srcaddr);
431 ipv4_block->dstaddr = ntohl(v5_record->dstaddr);
432
433 if ( exporter->sampler->info.interval == 1 ) {
434 value32_t *v = (value32_t *)ipv4_block->data;
435
436 packets = (uint64_t)ntohl(v5_record->dPkts);
437 bytes = (uint64_t)ntohl(v5_record->dOctets);
438
439 v->val = packets;
440 v = (value32_t *)v->data;
441 v->val = bytes;
442
443 data_ptr = (void *)v->data;
444 } else {
445 value64_t *v = (value64_t *)ipv4_block->data;
446 uint32_t *ptr = (uint32_t *)&packets;
447
448 packets = (uint64_t)ntohl(v5_record->dPkts) * (uint64_t)exporter->sampler->info.interval;
449 bytes = (uint64_t)ntohl(v5_record->dOctets) * (uint64_t)exporter->sampler->info.interval;
450
451 // pack packets in 32bit chunks
452 v->val.val32[0] = ptr[0];
453 v->val.val32[1] = ptr[1];
454
455 // pack bytes in 32bit chunks
456 v = (value64_t *)v->data;
457 ptr = (uint32_t *)&bytes;
458 v->val.val32[0] = ptr[0];
459 v->val.val32[1] = ptr[1];
460
461 data_ptr = (void *)v->data;
462 }
463
464 // process optional extensions
465 j = 0;
466 while ( (id = extension_map->ex_id[j]) != 0 ) {
467 switch (id) {
468 case EX_IO_SNMP_2: { // 2 byte input/output interface index
469 tpl_ext_4_t *tpl = (tpl_ext_4_t *)data_ptr;
470 tpl->input = ntohs(v5_record->input);
471 tpl->output = ntohs(v5_record->output);
472 data_ptr = (void *)tpl->data;
473 } break;
474 case EX_AS_2: { // 2 byte src/dst AS number
475 tpl_ext_6_t *tpl = (tpl_ext_6_t *)data_ptr;
476 tpl->src_as = ntohs(v5_record->src_as);
477 tpl->dst_as = ntohs(v5_record->dst_as);
478 data_ptr = (void *)tpl->data;
479 } break;
480 case EX_MULIPLE: { // dst tos, direction, src/dst mask
481 tpl_ext_8_t *tpl = (tpl_ext_8_t *)data_ptr;
482 tpl->dst_tos = 0;
483 tpl->dir = 0;
484 tpl->src_mask = v5_record->src_mask;
485 tpl->dst_mask = v5_record->dst_mask;
486 data_ptr = (void *)tpl->data;
487 } break;
488 case EX_NEXT_HOP_v4: { // IPv4 next hop
489 tpl_ext_9_t *tpl = (tpl_ext_9_t *)data_ptr;
490 tpl->nexthop = ntohl(v5_record->nexthop);
491 data_ptr = (void *)tpl->data;
492 } break;
493 case EX_ROUTER_IP_v4: { // IPv4 router address
494 tpl_ext_23_t *tpl = (tpl_ext_23_t *)data_ptr;
495 tpl->router_ip = fs->ip.V4;
496 data_ptr = (void *)tpl->data;
497 ClearFlag(common_record->flags, FLAG_IPV6_EXP);
498 } break;
499 case EX_ROUTER_ID: { // engine type, engine ID
500 tpl_ext_25_t *tpl = (tpl_ext_25_t *)data_ptr;
501 uint16_t engine_tag = ntohs(v5_header->engine_tag);
502 tpl->engine_type = (engine_tag >> 8) & 0xFF;
503 tpl->engine_id = (engine_tag & 0xFF);
504 data_ptr = (void *)tpl->data;
505 } break;
506 case EX_RECEIVED: {
507 tpl_ext_27_t *tpl = (tpl_ext_27_t *)data_ptr;
508 tpl->received = (uint64_t)((uint64_t)fs->received.tv_sec * 1000LL) + (uint64_t)((uint64_t)fs->received.tv_usec / 1000LL);
509 data_ptr = (void *)tpl->data;
510 } break;
511
512 default:
513 // this should never happen, as v5 has no other extensions
514 LogError("Process_v5: Unexpected extension %i for v5 record. Skip extension", id);
515 }
516 j++;
517 }
518
519 // Time issues
520 First = ntohl(v5_record->First);
521 Last = ntohl(v5_record->Last);
522
523 #ifdef FIXTIMEBUG
524 /*
525 * Some users report, that they see flows, which have duration time of about 40days
526 * which is almost the overflow value. Investigating this, it cannot be an overflow
527 * and the difference is always 15160 or 15176 msec too little for a classical
528 * overflow. Therefore assume this must be an exporter bug
529 */
530 if ( First > Last && ( (First - Last) < 20000) ) {
531 uint32_t _t;
532 LogError("Process_v5: Unexpected time swap: First 0x%llx smaller than boot time: 0x%llx", start_time, boot_time);
533 _t= First;
534 First = Last;
535 Last = _t;
536 }
537 #endif
538 if ( First > Last ) {
539 /* First in msec, in case of msec overflow, between start and end */
540 start_time = boot_time - 0x100000000LL + (uint64_t)First;
541 } else {
542 start_time = boot_time + (uint64_t)First;
543 }
544
545 /* end time in msecs */
546 end_time = (uint64_t)Last + boot_time;
547
548 // if overflow happened after flow ended but before got exported
549 // the additional check > 100000 is required due to a CISCO IOS bug
550 // CSCei12353 - thanks to Bojan
551 if ( Last > v5_header->SysUptime && (( Last - v5_header->SysUptime) > 100000)) {
552 start_time -= 0x100000000LL;
553 end_time -= 0x100000000LL;
554 }
555
556 common_record->first = start_time/1000;
557 common_record->msec_first = start_time - common_record->first*1000;
558
559 common_record->last = end_time/1000;
560 common_record->msec_last = end_time - common_record->last*1000;
561
562 // update first_seen, last_seen
563 if ( start_time < fs->first_seen )
564 fs->first_seen = start_time;
565 if ( end_time > fs->last_seen )
566 fs->last_seen = end_time;
567
568
569 // Update stats
570 switch (common_record->prot) {
571 case IPPROTO_ICMP:
572 fs->nffile->stat_record->numflows_icmp++;
573 fs->nffile->stat_record->numpackets_icmp += packets;
574 fs->nffile->stat_record->numbytes_icmp += bytes;
575 // fix odd CISCO behaviour for ICMP port/type in src port
576 if ( common_record->srcport != 0 ) {
577 s1 = (uint8_t *)&(common_record->srcport);
578 s2 = (uint8_t *)&(common_record->dstport);
579 s2[0] = s1[1];
580 s2[1] = s1[0];
581 common_record->srcport = 0;
582 }
583 break;
584 case IPPROTO_TCP:
585 fs->nffile->stat_record->numflows_tcp++;
586 fs->nffile->stat_record->numpackets_tcp += packets;
587 fs->nffile->stat_record->numbytes_tcp += bytes;
588 break;
589 case IPPROTO_UDP:
590 fs->nffile->stat_record->numflows_udp++;
591 fs->nffile->stat_record->numpackets_udp += packets;
592 fs->nffile->stat_record->numbytes_udp += bytes;
593 break;
594 default:
595 fs->nffile->stat_record->numflows_other++;
596 fs->nffile->stat_record->numpackets_other += packets;
597 fs->nffile->stat_record->numbytes_other += bytes;
598 }
599 exporter->flows++;
600 fs->nffile->stat_record->numflows++;
601 fs->nffile->stat_record->numpackets += packets;
602 fs->nffile->stat_record->numbytes += bytes;
603
604 if ( verbose ) {
605 master_record_t master_record;
606 memset((void *)&master_record, 0, sizeof(master_record_t));
607 ExpandRecord_v2((common_record_t *)common_record, &v5_extension_info, &(exporter->info), &master_record);
608 flow_record_to_raw(&master_record, &string, 0);
609 printf("%s\n", string);
610 }
611
612 // advance to next input flow record
613 v5_record = (netflow_v5_record_t *)((pointer_addr_t)v5_record + flow_record_length);
614
615 if ( ((pointer_addr_t)data_ptr - (pointer_addr_t)common_record) != v5_output_record_size ) {
616 printf("Panic size check: ptr diff: %llu, record size: %u\n",
617 (unsigned long long)((pointer_addr_t)data_ptr - (pointer_addr_t)common_record), v5_output_record_size );
618 abort();
619 }
620 // advance to next output record
621 common_record = (common_record_t *)data_ptr;
622 ipv4_block = (ipv4_block_t *)common_record->data;
623
624 // buffer size sanity check - should never happen, but check it anyway
625 bsize = (pointer_addr_t)common_record - (pointer_addr_t)fs->nffile->block_header - sizeof(data_block_header_t);
626 if ( bsize >= BUFFSIZE ) {
627 LogError("### Software error ###: %s line %d", __FILE__, __LINE__);
628 LogError("Process_v5: Output buffer overflow! Flush buffer and skip records.");
629 LogError("Buffer size: size: %u, bsize: %llu > %u", fs->nffile->block_header->size, (unsigned long long)bsize, BUFFSIZE);
630 // reset buffer
631 fs->nffile->block_header->size = 0;
632 fs->nffile->block_header->NumRecords = 0;
633 fs->nffile->buff_ptr = (void *)((pointer_addr_t)fs->nffile->block_header + sizeof(data_block_header_t) );
634 return;
635 }
636
637 } // End of foreach v5 record
638
639 // update file record size ( -> output buffer size )
640 fs->nffile->block_header->NumRecords += count;
641 fs->nffile->block_header->size += count * v5_output_record_size;
642 fs->nffile->buff_ptr = (void *)common_record;
643
644 // still to go for this many input bytes
645 size_left -= NETFLOW_V5_HEADER_LENGTH + count * flow_record_length;
646
647 // next header
648 v5_header = (netflow_v5_header_t *)v5_record;
649
650 // should never be < 0
651 done = size_left <= 0;
652
653 } // End of while !done
654
655 return;
656
657 } /* End of Process_v5 */
658
659 /*
660 * functions used for sending netflow v5 records
661 */
Init_v5_v7_output(send_peer_t * peer)662 void Init_v5_v7_output(send_peer_t *peer) {
663
664 v5_output_header = (netflow_v5_header_t *)peer->send_buffer;
665 v5_output_header->version = htons(5);
666 v5_output_header->SysUptime = 0;
667 v5_output_header->unix_secs = 0;
668 v5_output_header->unix_nsecs = 0;
669 v5_output_header->count = 0;
670 output_engine.first = 1;
671
672 output_engine.sequence = 0;
673 output_engine.last_sequence = 0;
674 output_engine.last_count = 0;
675 output_engine.sequence_failure = 0;
676 v5_output_record = (netflow_v5_record_t *)((pointer_addr_t)v5_output_header + (pointer_addr_t)sizeof(netflow_v5_header_t));
677
678 } // End of Init_v5_v7_output
679
Add_v5_output_record(master_record_t * master_record,send_peer_t * peer)680 int Add_v5_output_record(master_record_t *master_record, send_peer_t *peer) {
681 static uint64_t boot_time; // in msec
682 static int cnt;
683 extension_map_t *extension_map = master_record->map_ref;
684 uint32_t i, id, t1, t2;
685
686 // Skip IPv6 records
687 if ( (master_record->flags & FLAG_IPV6_ADDR ) != 0 )
688 return 0;
689
690 if ( output_engine.first ) { // first time a record is added
691 // boot time is set one day back - assuming that the start time of every flow does not start ealier
692 boot_time = (uint64_t)(master_record->first - 86400)*1000;
693 v5_output_header->unix_secs = htonl(master_record->first - 86400);
694 cnt = 0;
695 output_engine.first = 0;
696 }
697 if ( cnt == 0 ) {
698 peer->buff_ptr = (void *)((pointer_addr_t)peer->send_buffer + NETFLOW_V5_HEADER_LENGTH);
699 v5_output_record = (netflow_v5_record_t *)((pointer_addr_t)v5_output_header + (pointer_addr_t)sizeof(netflow_v5_header_t));
700 output_engine.sequence = output_engine.last_sequence + output_engine.last_count;
701 v5_output_header->flow_sequence = htonl(output_engine.sequence);
702 output_engine.last_sequence = output_engine.sequence;
703 }
704
705 t1 = (uint32_t)(1000LL * (uint64_t)master_record->first + (uint64_t)master_record->msec_first - boot_time);
706 t2 = (uint32_t)(1000LL * (uint64_t)master_record->last + (uint64_t)master_record->msec_last - boot_time);
707 v5_output_record->First = htonl(t1);
708 v5_output_record->Last = htonl(t2);
709
710 v5_output_record->srcaddr = htonl(master_record->V4.srcaddr);
711 v5_output_record->dstaddr = htonl(master_record->V4.dstaddr);
712
713 v5_output_record->srcport = htons(master_record->srcport);
714 v5_output_record->dstport = htons(master_record->dstport);
715 v5_output_record->tcp_flags = master_record->tcp_flags;
716 v5_output_record->prot = master_record->prot;
717 v5_output_record->tos = master_record->tos;
718
719 // the 64bit counters are cut down to 32 bits for v5
720 v5_output_record->dPkts = htonl((uint32_t)master_record->dPkts);
721 v5_output_record->dOctets = htonl((uint32_t)master_record->dOctets);
722
723 v5_output_record->input = htons(master_record->input);
724 v5_output_record->output = htons(master_record->output);
725 v5_output_record->src_as = htons(master_record->srcas);
726 v5_output_record->dst_as = htons(master_record->dstas);
727 v5_output_record->src_mask = 0;
728 v5_output_record->dst_mask = 0;
729 v5_output_record->pad1 = 0;
730 v5_output_record->pad2 = 0;
731 v5_output_record->nexthop = 0;
732
733 i = 0;
734 while ( (id = extension_map->ex_id[i]) != 0 ) {
735 switch (id) {
736 case EX_IO_SNMP_2:
737 v5_output_record->input = htons(master_record->input);
738 v5_output_record->output = htons(master_record->output);
739 break;
740 case EX_AS_2:
741 v5_output_record->src_as = htons(master_record->srcas);
742 v5_output_record->dst_as = htons(master_record->dstas);
743 break;
744 case EX_MULIPLE:
745 v5_output_record->src_mask = master_record->src_mask;
746 v5_output_record->dst_mask = master_record->dst_mask;
747 break;
748 case EX_NEXT_HOP_v4:
749 v5_output_record->nexthop = htonl(master_record->ip_nexthop.V4);
750 break;
751 // default: Other extensions can not be sent with v5
752 }
753 i++;
754 }
755 cnt++;
756
757 v5_output_header->count = htons(cnt);
758 peer->buff_ptr = (void *)((pointer_addr_t)peer->buff_ptr + NETFLOW_V5_RECORD_LENGTH);
759 v5_output_record++;
760 if ( cnt == NETFLOW_V5_MAX_RECORDS ) {
761 peer->flush = 1;
762 output_engine.last_count = cnt;
763 cnt = 0;
764 }
765
766 return 0;
767
768 } // End of Add_v5_output_record
769