1 #include <iostream>
2 #include <sys/types.h>
3 #include <inttypes.h>
4
5 #include "sflow_collector.h"
6
7 // sflowtool-3.32
8 #include "sflow.h"
9 // custom sFLOW data structures
10 #include "sflow_data.h"
11
12 #include "../fast_library.h"
13
14 #include <sys/socket.h>
15 #include <netinet/in.h>
16 #include <arpa/inet.h>
17 #include <arpa/inet.h>
18
19 // UDP server
20 #include <sys/socket.h>
21 #include <netinet/in.h>
22 #include <stdio.h>
23 #include <string.h>
24 #include <setjmp.h>
25 #include <stdlib.h>
26
27 // log4cpp logging facility
28 #include "log4cpp/Category.hh"
29 #include "log4cpp/Appender.hh"
30 #include "log4cpp/FileAppender.hh"
31 #include "log4cpp/OstreamAppender.hh"
32 #include "log4cpp/Layout.hh"
33 #include "log4cpp/BasicLayout.hh"
34 #include "log4cpp/PatternLayout.hh"
35 #include "log4cpp/Priority.hh"
36
37 #ifdef ENABLE_LUA_HOOKS
38 lua_State* sflow_lua_state = NULL;
39
40 bool sflow_lua_hooks_enabled = false;
41 std::string sflow_lua_hooks_path = "/usr/src/fastnetmon/src/sflow_hooks.lua";
42 #endif
43
44 // Ethertype of outer tag in QinQ
45 uint32_t sflow_qinq_ethertype = 0x8100;
46
47 // Disable QinQ processing by default
48 bool sflow_qinq_process = false;
49
50 // sFLOW v4 specification: http://www.sflow.org/rfc3176.txt
51
52 std::string plugin_name = "sflow";
53 std::string plugin_log_prefix = plugin_name + ": ";
54
55 // Get logger from main program
56 extern log4cpp::Category& logger;
57
58 // Global configuration map
59 extern std::map<std::string, std::string> configuration_map;
60
61 // Enable debug messages in log
62 bool debug_sflow_parser = false;
63
64 uint32_t getData32(SFSample* sample);
65 bool skipTLVRecord(SFSample* sample, uint32_t tag, uint32_t len);
66 bool readFlowSample(SFSample* sample, int expanded);
67 void readFlowSample_header(SFSample* sample);
68 void decode_ipv4_protocol(SFSample* sample);
69 void decode_ipv6_protocol(SFSample* sample);
70 void print_simple_packet(struct simple_packet& packet);
71
72 process_packet_pointer sflow_process_func_ptr = NULL;
73
74 // #include <sys/prctl.h>
75
76 void start_sflow_collector(std::string interface_for_binding, unsigned int sflow_port);
77
start_sflow_collection(process_packet_pointer func_ptr)78 void start_sflow_collection(process_packet_pointer func_ptr) {
79 std::string interface_for_binding = "0.0.0.0";
80 std::string sflow_ports = "";
81
82 logger << log4cpp::Priority::INFO << plugin_log_prefix << "plugin started";
83 // prctl(PR_SET_NAME,"fastnetmon_sflow", 0, 0, 0);
84
85 sflow_process_func_ptr = func_ptr;
86
87 if (configuration_map.count("sflow_port") != 0) {
88 sflow_ports = configuration_map["sflow_port"];
89 }
90
91 if (configuration_map.count("sflow_host") != 0) {
92 interface_for_binding = configuration_map["sflow_host"];
93 }
94
95 if (configuration_map.count("sflow_qinq_process") != 0) {
96 if (configuration_map["sflow_qinq_process"] == "on") {
97
98 sflow_qinq_process = true;
99 logger << log4cpp::Priority::INFO << plugin_log_prefix << "qinq processing enabled";
100
101 if (configuration_map.count("sflow_qinq_ethertype") != 0) {
102 if (convert_hex_as_string_to_uint(configuration_map["sflow_qinq_ethertype"], sflow_qinq_ethertype)) {
103 logger << log4cpp::Priority::WARN << plugin_log_prefix
104 << "can't parse value in sflow_qinq_ethertype variable";
105 logger << log4cpp::Priority::INFO << plugin_log_prefix
106 << "disable qinq processing";
107 sflow_qinq_process = false;
108 }
109 }
110 }
111 }
112
113 #ifdef ENABLE_LUA_HOOKS
114 if (configuration_map.count("sflow_lua_hooks_path") != 0) {
115 sflow_lua_hooks_path = configuration_map["sflow_lua_hooks_path"];
116
117 sflow_lua_hooks_enabled = true;
118 }
119 #endif
120
121 #ifdef ENABLE_LUA_HOOKS
122 if (sflow_lua_hooks_enabled) {
123 sflow_lua_state = init_lua_jit(sflow_lua_hooks_path);
124
125 if (sflow_lua_state == NULL) {
126 sflow_lua_hooks_enabled = false;
127 }
128 }
129 #endif
130
131 boost::thread_group sflow_collector_threads;
132
133 std::vector<std::string> ports_for_listen;
134 boost::split(ports_for_listen, sflow_ports, boost::is_any_of(","), boost::token_compress_on);
135
136 logger << log4cpp::Priority::INFO << plugin_log_prefix << "We will listen on " << ports_for_listen.size() << " ports";
137
138 for (std::vector<std::string>::iterator port = ports_for_listen.begin(); port != ports_for_listen.end(); ++port) {
139 unsigned int sflow_port = convert_string_to_integer(*port);
140
141 if (sflow_port == 0) {
142 sflow_port = 6343;
143 }
144
145 sflow_collector_threads.add_thread( new boost::thread(start_sflow_collector,
146 interface_for_binding,
147 sflow_port
148 ));
149 }
150
151 sflow_collector_threads.join_all();
152 }
153
start_sflow_collector(std::string interface_for_binding,unsigned int sflow_port)154 void start_sflow_collector(std::string interface_for_binding, unsigned int sflow_port) {
155
156 logger << log4cpp::Priority::INFO << plugin_log_prefix << "plugin will listen on " << interface_for_binding
157 << ":" << sflow_port << " udp port";
158
159 unsigned int udp_buffer_size = 65536;
160 char udp_buffer[udp_buffer_size];
161
162 int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
163
164 struct sockaddr_in servaddr;
165 memset(&servaddr, 0, sizeof(servaddr));
166
167 servaddr.sin_family = AF_INET;
168
169 if (interface_for_binding == "0.0.0.0") {
170 servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
171 } else {
172 servaddr.sin_addr.s_addr = inet_addr(interface_for_binding.c_str());
173 }
174
175 servaddr.sin_port = htons(sflow_port);
176 int bind_result = bind(sockfd, (struct sockaddr*)&servaddr, sizeof(servaddr));
177
178 if (bind_result) {
179 logger << log4cpp::Priority::ERROR << plugin_log_prefix << "can't listen port: " << sflow_port;
180 return;
181 }
182
183 struct sockaddr_in6 peer;
184 memset(&peer, 0, sizeof(peer));
185
186 /* We should specify timeout there for correct toolkit shutdown */
187 /* Because otherwise recvfrom will stay in blocked mode forever */
188 struct timeval tv;
189 tv.tv_sec = 5; /* X Secs Timeout */
190 tv.tv_usec = 0; // Not init'ing this can cause strange errors
191
192 setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(struct timeval));
193
194 while (true) {
195 struct sockaddr_in cliaddr;
196 socklen_t address_len = sizeof(cliaddr);
197
198 int received_bytes =
199 recvfrom(sockfd, udp_buffer, udp_buffer_size, 0, (struct sockaddr*)&cliaddr, &address_len);
200
201 if (received_bytes > 0) {
202 // printf("We receive %d\n", received_bytes);
203
204 SFSample sample;
205 memset(&sample, 0, sizeof(sample));
206 sample.rawSample = (uint8_t*)udp_buffer;
207 sample.rawSampleLen = received_bytes;
208
209 if (address_len == sizeof(struct sockaddr_in)) {
210 struct sockaddr_in* peer4 = (struct sockaddr_in*)&cliaddr;
211 sample.sourceIP.type = SFLADDRESSTYPE_IP_V4;
212 memcpy(&sample.sourceIP.address.ip_v4, &peer4->sin_addr, 4);
213
214 read_sflow_datagram(&sample);
215 } else {
216 // We do not support an IPv6
217 }
218 } else {
219 if (received_bytes == -1) {
220
221 if (errno == EAGAIN) {
222 // We got timeout, it's OK!
223 } else {
224 logger << log4cpp::Priority::ERROR << plugin_log_prefix << "data receive failed";
225 }
226 }
227 }
228
229 // Add interruption point for correct application shutdown
230 boost::this_thread::interruption_point();
231 }
232 }
233
getData32_nobswap(SFSample * sample)234 uint32_t getData32_nobswap(SFSample* sample) {
235 uint32_t ans = *(sample->datap)++;
236 // make sure we didn't run off the end of the datagram. Thanks to
237 // Sven Eschenberg for spotting a bug/overrun-vulnerabilty that was here before.
238 if ((uint8_t*)sample->datap > sample->endp) {
239 // SFABORT(sample, SF_ABORT_EOS);
240 // Error!!!
241 logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we tried to read data in bad place! Fault!";
242 return 0;
243 }
244
245 return ans;
246 }
247
skipBytes(SFSample * sample,uint32_t skip)248 bool skipBytes(SFSample* sample, uint32_t skip) {
249 int quads = (skip + 3) / 4;
250 sample->datap += quads;
251 if (skip > sample->rawSampleLen || (uint8_t*)sample->datap > sample->endp) {
252 // SFABORT(sample, SF_ABORT_EOS);
253 logger << log4cpp::Priority::ERROR << plugin_log_prefix
254 << "very dangerous error from skipBytes function! We try to read from restricted memory region";
255
256 return false;
257 }
258
259 return true;
260 }
261
getAddress(SFSample * sample,SFLAddress * address)262 uint32_t getAddress(SFSample* sample, SFLAddress* address) {
263 address->type = getData32(sample);
264 if (address->type == SFLADDRESSTYPE_IP_V4) {
265 address->address.ip_v4.addr = getData32_nobswap(sample);
266 } else {
267 memcpy(&address->address.ip_v6.addr, sample->datap, 16);
268 skipBytes(sample, 16);
269 }
270
271 return address->type;
272 }
273
getData32(SFSample * sample)274 uint32_t getData32(SFSample* sample) {
275 return ntohl(getData32_nobswap(sample));
276 }
277
readFlowSample_v2v4(SFSample * sample)278 bool readFlowSample_v2v4(SFSample *sample) {
279 sample->samplesGenerated = getData32(sample);
280
281 uint32_t samplerId = getData32(sample);
282 sample->ds_class = samplerId >> 24;
283 sample->ds_index = samplerId & 0x00ffffff;
284
285 sample->meanSkipCount = getData32(sample);
286 sample->samplePool = getData32(sample);
287 sample->dropEvents = getData32(sample);
288 sample->inputPort = getData32(sample);
289 sample->outputPort = getData32(sample);
290
291 sample->packet_data_tag = getData32(sample);
292
293 switch(sample->packet_data_tag) {
294
295 case INMPACKETTYPE_HEADER:
296 readFlowSample_header(sample);
297
298 break;
299 case INMPACKETTYPE_IPV4:
300 logger << log4cpp::Priority::ERROR << plugin_log_prefix << "hit INMPACKETTYPE_IPV4, very strange";
301 return false;
302
303 break;
304 case INMPACKETTYPE_IPV6:
305 logger << log4cpp::Priority::ERROR << plugin_log_prefix << "hit INMPACKETTYPE_IPV6, very strange";
306 return false;
307
308 break;
309 default:
310 logger << log4cpp::Priority::ERROR << plugin_log_prefix << "unexpected packet_data_tag";
311 return false;
312
313 break;
314 }
315
316 sample->extended_data_tag = 0;
317
318 // We should read this data
319 sample->num_extended = getData32(sample);
320
321 if (sample->num_extended > 0) {
322 logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we have " << sample->num_extended << " extended fields";
323 logger << log4cpp::Priority::ERROR << plugin_log_prefix << "and sorry we haven't support for it :(";
324
325 return false;
326 }
327
328 return true;
329 }
330
read_sflow_datagram(SFSample * sample)331 void read_sflow_datagram(SFSample* sample) {
332 sample->datap = (uint32_t*)sample->rawSample;
333 sample->endp = (uint8_t*)sample->rawSample + sample->rawSampleLen;
334
335 sample->datagramVersion = getData32(sample);
336 // printf("sFLOW version %d\n", sample->datagramVersion);
337
338 if (sample->datagramVersion != 5 && sample->datagramVersion != 4) {
339 logger << log4cpp::Priority::ERROR
340 << plugin_log_prefix
341 << "we do not support sFLOW v<< "<< sample->datagramVersion
342 << " because it's too old. Please change version to sFLOW 4 or 5";
343 return;
344 }
345
346 /* get the agent address */
347 getAddress(sample, &sample->agent_addr);
348
349 /* version 5 has an agent sub-id as well */
350 if (sample->datagramVersion >= 5) {
351 sample->agentSubId = getData32(sample);
352 // sf_log(sample,"agentSubId %u\n", sample->agentSubId);
353 } else {
354 sample->agentSubId = 0;
355 }
356
357 sample->sequenceNo = getData32(sample); /* this is the packet sequence number */
358 sample->sysUpTime = getData32(sample);
359 uint32_t samplesInPacket = getData32(sample);
360
361 // printf("We have %d samples in packet\n", samplesInPacket);
362
363 uint32_t samp = 0;
364 for (; samp < samplesInPacket; samp++) {
365 if ((uint8_t*)sample->datap >= sample->endp) {
366 logger
367 << log4cpp::Priority::INFO
368 << plugin_log_prefix
369 << "we tried to read data outside packet! It's very dangerous, we stop all operations";
370 return;
371 }
372
373 // printf("Sample #%d\n", samp);
374
375 /* just read the tag, then call the approriate decode fn */
376 sample->sampleType = getData32(sample);
377 if (sample->datagramVersion >= 5) {
378 switch (sample->sampleType) {
379 case SFLFLOW_SAMPLE:
380 // skipBytes(sample, getData32(sample));
381 if (!readFlowSample(sample, 0)) {
382 logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we failed in SFLFLOW_SAMPLE handler";
383 return;
384 }
385
386 break;
387 case SFLCOUNTERS_SAMPLE:
388 // We do not need counters for our task, skip it
389 if (!skipBytes(sample, getData32(sample))) {
390 logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we failed in SFLCOUNTERS_SAMPLE handler";
391 return;
392 }
393
394 break;
395 case SFLFLOW_SAMPLE_EXPANDED:
396 // skipBytes(sample, getData32(sample));
397 if (!readFlowSample(sample, 1)) {
398 logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we failed in SFLFLOW_SAMPLE_EXPANDED handler";
399 return;
400 }
401
402 break;
403 case SFLCOUNTERS_SAMPLE_EXPANDED:
404 // We do not need counters for our task, skip it
405 if (!skipBytes(sample, getData32(sample))) {
406 logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we failed in SFLCOUNTERS_SAMPLE_EXPANDED handler";
407 return;
408 }
409
410 break;
411 default:
412 if (!skipTLVRecord(sample, sample->sampleType, getData32(sample))) {
413 logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we failed in default handler in skipTLVRecord";
414 return;
415 }
416 break;
417 }
418 } else {
419 // sFLOW v2 or v4 here
420 switch(sample->sampleType) {
421 case FLOWSAMPLE:
422 if (!readFlowSample_v2v4(sample)) {
423 // We have some troubles with old sFLOW parser
424 return;
425 }
426 break;
427 case COUNTERSSAMPLE:
428 logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we haven't support for COUNTERSSAMPLE for "
429 << "sFLOW v4 and ignore it completely";
430 return;
431 break;
432 default:
433 logger << log4cpp::Priority::ERROR << plugin_log_prefix << "unexpected sample type: " << sample->sampleType;
434 return;
435 break;
436 }
437 }
438 }
439 }
440
skipTLVRecord(SFSample * sample,uint32_t tag,uint32_t len)441 bool skipTLVRecord(SFSample* sample, uint32_t tag, uint32_t len) {
442 return skipBytes(sample, len);
443 }
444
445
length_check(SFSample * sample,const char * description,uint8_t * start,int len)446 bool length_check(SFSample *sample, const char *description, uint8_t *start, int len) {
447 uint32_t actualLen = (uint8_t *)sample->datap - start;
448 uint32_t adjustedLen = ((len + 3) >> 2) << 2;
449
450 if (actualLen != adjustedLen) {
451 logger << log4cpp::Priority::ERROR << plugin_log_prefix << description
452 << " length error: expected " << len << " found " << actualLen;
453 return false;
454 }
455
456 return true;
457 }
458
readFlowSample(SFSample * sample,int expanded)459 bool readFlowSample(SFSample* sample, int expanded) {
460 uint32_t num_elements, sampleLength;
461 uint8_t* sampleStart;
462
463 sampleLength = getData32(sample);
464 sampleStart = (uint8_t*)sample->datap;
465 sample->samplesGenerated = getData32(sample);
466
467 if (expanded) {
468 sample->ds_class = getData32(sample);
469 sample->ds_index = getData32(sample);
470 } else {
471 uint32_t samplerId = getData32(sample);
472 sample->ds_class = samplerId >> 24;
473 sample->ds_index = samplerId & 0x00ffffff;
474 }
475
476 sample->meanSkipCount = getData32(sample);
477 // printf("Sample ratio: %d\n", sample->meanSkipCount);
478 sample->samplePool = getData32(sample);
479 sample->dropEvents = getData32(sample);
480
481 if (expanded) {
482 sample->inputPortFormat = getData32(sample);
483 sample->inputPort = getData32(sample);
484 sample->outputPortFormat = getData32(sample);
485 sample->outputPort = getData32(sample);
486 } else {
487 uint32_t inp, outp;
488 inp = getData32(sample);
489 outp = getData32(sample);
490 sample->inputPortFormat = inp >> 30;
491 sample->outputPortFormat = outp >> 30;
492 sample->inputPort = inp & 0x3fffffff;
493 sample->outputPort = outp & 0x3fffffff;
494 }
495
496 num_elements = getData32(sample);
497 uint32_t el;
498 for (el = 0; el < num_elements; el++) {
499 uint32_t tag, length;
500 uint8_t* start;
501 char buf[51];
502 tag = sample->elementType = getData32(sample);
503
504 length = getData32(sample);
505 start = (uint8_t*)sample->datap;
506
507 // tag analyze
508 if (tag == SFLFLOW_HEADER) {
509 // process data
510 readFlowSample_header(sample);
511 } else {
512 if (!skipTLVRecord(sample, tag, length)) {
513 return false;
514 }
515 }
516
517 if (!length_check(sample, "flow_sample_element", start, length)) {
518 return false;
519 }
520 }
521
522 if (!length_check(sample, "flow_sample", sampleStart, sampleLength)) {
523 return false;
524 }
525
526 return true;
527 }
528
529 #define NFT_ETHHDR_SIZ 14
530 #define NFT_8022_SIZ 3
531 #define NFT_MAX_8023_LEN 1500
532
533 #define NFT_MIN_SIZ (NFT_ETHHDR_SIZ + sizeof(struct myiphdr))
534
decode_link_layer(SFSample * sample)535 void decode_link_layer(SFSample* sample) {
536 uint8_t* start = (uint8_t*)sample->header;
537 uint8_t* end = start + sample->headerLen;
538 uint8_t* ptr = start;
539 uint16_t type_len;
540
541 /* assume not found */
542 sample->gotIPV4 = 0;
543 sample->gotIPV6 = 0;
544
545 if (sample->headerLen < NFT_ETHHDR_SIZ) {
546 /* not enough for an Ethernet header */
547 return;
548 }
549
550 // sf_log(sample,"dstMAC %02x%02x%02x%02x%02x%02x\n", ptr[0], ptr[1], ptr[2], ptr[3], ptr[4],
551 // ptr[5]);
552 memcpy(sample->eth_dst, ptr, 6);
553 ptr += 6;
554
555 // sf_log(sample,"srcMAC %02x%02x%02x%02x%02x%02x\n", ptr[0], ptr[1], ptr[2], ptr[3], ptr[4],
556 // ptr[5]);
557 memcpy(sample->eth_src, ptr, 6);
558 ptr += 6;
559 type_len = (ptr[0] << 8) + ptr[1];
560 ptr += 2;
561
562 if (sflow_qinq_process && type_len == sflow_qinq_ethertype && ((ptr[2] << 8) + ptr[3]) == 0x8100) {
563 /* Outer VLAN tag - next two bytes */
564 uint32_t vlanData = (ptr[0] << 8) + ptr[1];
565 uint32_t vlan = vlanData & 0x0fff;
566 uint32_t priority = vlanData >> 13;
567 ptr += 2;
568
569 sample->in_outer_vlan = vlan;
570 /* now get the type_len again (next two bytes) */
571 type_len = (ptr[0] << 8) + ptr[1];
572 ptr += 2;
573 }
574
575 if (type_len == 0x8100) {
576 /* Inner VLAN tag - next two bytes */
577 uint32_t vlanData = (ptr[0] << 8) + ptr[1];
578 uint32_t vlan = vlanData & 0x0fff;
579 uint32_t priority = vlanData >> 13;
580 ptr += 2;
581
582 /* _____________________________________ */
583 /* | pri | c | vlan-id | */
584 /* ------------------------------------- */
585 /* [priority = 3bits] [Canonical Format Flag = 1bit] [vlan-id = 12 bits] */
586 // sf_log(sample,"decodedVLAN %u\n", vlan);
587 // sf_log(sample,"decodedPriority %u\n", priority);
588 sample->in_vlan = vlan;
589 /* now get the type_len again (next two bytes) */
590 type_len = (ptr[0] << 8) + ptr[1];
591 ptr += 2;
592 }
593
594 /* assume type_len is an ethernet-type now */
595 sample->eth_type = type_len;
596
597 if (type_len == 0x0800) {
598 /* IPV4 */
599 if ((end - ptr) < sizeof(struct myiphdr)) {
600 return;
601 }
602
603 /* look at first byte of header.... */
604 /* ___________________________ */
605 /* | version | hdrlen | */
606 /* --------------------------- */
607
608 if ((*ptr >> 4) != 4) return; /* not version 4 */
609 if ((*ptr & 15) < 5) return; /* not IP (hdr len must be 5 quads or more) */
610
611 /* survived all the tests - store the offset to the start of the ip header */
612 sample->gotIPV4 = 1;
613 sample->offsetToIPV4 = (ptr - start);
614 }
615
616 if (type_len == 0x86DD) {
617 /* IPV6 */
618 /* look at first byte of header.... */
619
620 if ((*ptr >> 4) != 6) return; /* not version 6 */
621
622 /* survived all the tests - store the offset to the start of the ip6 header */
623 sample->gotIPV6 = 1;
624 sample->offsetToIPV6 = (ptr - start);
625
626 printf("IPv6\n");
627 }
628
629 // printf("vlan: %d\n",sample->in_vlan);
630 }
631
readFlowSample_header(SFSample * sample)632 void readFlowSample_header(SFSample* sample) {
633 sample->headerProtocol = getData32(sample);
634 sample->sampledPacketSize = getData32(sample);
635
636 if (sample->datagramVersion > 4) {
637 /* stripped count introduced in sFlow version 5 */
638 sample->stripped = getData32(sample);
639 }
640
641 sample->headerLen = getData32(sample);
642 sample->header = (uint8_t*)sample->datap; /* just point at the header */
643 skipBytes(sample, sample->headerLen);
644
645 if (sample->headerProtocol == SFLHEADER_ETHERNET_ISO8023) {
646 // Detect IPv4 or IPv6 here
647 decode_link_layer(sample);
648
649 // Process IP packets next
650 if (sample->gotIPV4) {
651 decode_ipv4_protocol(sample);
652 }
653
654 if (sample->gotIPV6) {
655 decode_ipv6_protocol(sample);
656 }
657 } else {
658 logger << log4cpp::Priority::ERROR << plugin_log_prefix << "not supported protocol: " << sample->headerProtocol;
659 return;
660 }
661 }
662
IP_to_a(uint32_t ipaddr,char * buf)663 char* IP_to_a(uint32_t ipaddr, char* buf) {
664 uint8_t* ip = (uint8_t*)&ipaddr;
665 /* should really be: snprintf(buf, buflen,...) but snprintf() is not always available */
666 sprintf(buf, "%u.%u.%u.%u", ip[0], ip[1], ip[2], ip[3]);
667 return buf;
668 }
669
printAddress(SFLAddress * address,char * buf)670 char* printAddress(SFLAddress* address, char* buf) {
671 switch (address->type) {
672 case SFLADDRESSTYPE_IP_V4:
673 IP_to_a(address->address.ip_v4.addr, buf);
674 break;
675 case SFLADDRESSTYPE_IP_V6: {
676 uint8_t* b = address->address.ip_v6.addr;
677 /* should really be: snprintf(buf, buflen,...) but snprintf() is not always available */
678 sprintf(buf, "%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x",
679 b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7], b[8], b[9], b[10], b[11], b[12],
680 b[13], b[14], b[15]);
681 } break;
682 default:
683 sprintf(buf, "-");
684 }
685
686 return buf;
687 }
688
decodeIPLayer4(SFSample * sample,uint8_t * ptr)689 void decodeIPLayer4(SFSample* sample, uint8_t* ptr) {
690 uint8_t* end = sample->header + sample->headerLen;
691
692 if (ptr > (end - 8)) {
693 /* not enough header bytes left */
694 return;
695 }
696
697 simple_packet current_packet;
698
699 if (sample->gotIPV6) {
700 current_packet.ip_protocol_version = 6;
701
702 memcpy(current_packet.src_ipv6.s6_addr, sample->ipsrc.address.ip_v6.addr, 16);
703 memcpy(current_packet.dst_ipv6.s6_addr, sample->ipdst.address.ip_v6.addr, 16);
704 } else {
705 current_packet.ip_protocol_version = 4;
706
707 current_packet.src_ip = sample->ipsrc.address.ip_v4.addr;
708 current_packet.dst_ip = sample->ipdst.address.ip_v4.addr;
709 }
710
711 // Because sFLOW data is near real time we could get current time
712 gettimeofday(¤t_packet.ts, NULL);
713
714 current_packet.flags = 0;
715 current_packet.number_of_packets = 1;
716 current_packet.length = sample->sampledPacketSize;
717 current_packet.sample_ratio = sample->meanSkipCount;
718
719 switch (sample->dcd_ipProtocol) {
720 case 1: {
721 // ICMP
722 current_packet.protocol = IPPROTO_ICMP;
723 struct myicmphdr icmp;
724 memcpy(&icmp, ptr, sizeof(icmp));
725 // printf("ICMPType %u\n", icmp.type);
726 // printf("ICMPCode %u\n", icmp.code);
727 sample->dcd_sport = icmp.type;
728 sample->dcd_dport = icmp.code;
729 sample->offsetToPayload = ptr + sizeof(icmp) - sample->header;
730 } break;
731 case 6: {
732 // TCP
733 current_packet.protocol = IPPROTO_TCP;
734 struct mytcphdr tcp;
735 int headerBytes;
736 memcpy(&tcp, ptr, sizeof(tcp));
737 sample->dcd_sport = ntohs(tcp.th_sport);
738 sample->dcd_dport = ntohs(tcp.th_dport);
739
740 current_packet.source_port = sample->dcd_sport;
741 current_packet.destination_port = sample->dcd_dport;
742 // TODO: flags could be broken because our flags parser implemented with PF_RING style flags
743 // PF_RING
744 current_packet.flags = tcp.th_flags;
745
746 sample->dcd_tcpFlags = tcp.th_flags;
747 // printf("TCPSrcPort %u\n", sample->dcd_sport);
748 // printf("TCPDstPort %u\n",sample->dcd_dport);
749 // printf("TCPFlags %u\n", sample->dcd_tcpFlags);
750 headerBytes = (tcp.th_off_and_unused >> 4) * 4;
751 ptr += headerBytes;
752 sample->offsetToPayload = ptr - sample->header;
753 } break;
754 case 17: {
755 // UDP
756 current_packet.protocol = IPPROTO_UDP;
757 struct myudphdr udp;
758 memcpy(&udp, ptr, sizeof(udp));
759 sample->dcd_sport = ntohs(udp.uh_sport);
760 sample->dcd_dport = ntohs(udp.uh_dport);
761
762 current_packet.source_port = sample->dcd_sport;
763 current_packet.destination_port = sample->dcd_dport;
764
765 sample->udp_pduLen = ntohs(udp.uh_ulen);
766 // printf("UDPSrcPort %u\n", sample->dcd_sport);
767 // printf("UDPDstPort %u\n", sample->dcd_dport);
768 // printf("UDPBytes %u\n", sample->udp_pduLen);
769 sample->offsetToPayload = ptr + sizeof(udp) - sample->header;
770 } break;
771 default: /* some other protcol */
772 sample->offsetToPayload = ptr - sample->header;
773 break;
774 }
775
776 #ifdef ENABLE_LUA_HOOKS
777 //sample->inputPort = fast_ntoh(sample->inputPort);
778 //sample->outputPort = fast_ntoh(sample->outputPort);
779
780 if (sflow_lua_hooks_enabled) {
781 // This code could be used only for tests with pcap_reader
782 if (sflow_lua_state == NULL) {
783 sflow_lua_state = init_lua_jit(sflow_lua_hooks_path);
784 }
785
786 if (call_lua_function("process_sflow", sflow_lua_state,
787 convert_ip_as_uint_to_string(sample->sourceIP.address.ip_v4.addr), (void*)sample)) {
788 // We will process this packet
789 } else {
790 logger << log4cpp::Priority::DEBUG << "We will drop this packets because LUA script decided to do it";
791 return;
792 }
793 }
794 #endif
795
796 // Call external handler function
797 sflow_process_func_ptr(current_packet);
798 }
799
decode_ipv6_protocol(SFSample * sample)800 void decode_ipv6_protocol(SFSample* sample) {
801 uint8_t *ptr = sample->header + sample->offsetToIPV6;
802 uint8_t *end = sample->header + sample->headerLen;
803
804 int ipVersion = (*ptr >> 4);
805
806 if (ipVersion != 6) {
807 logger << log4cpp::Priority::ERROR << plugin_log_prefix << "sFLOW header decode error: unexpected IP version: " << ipVersion;
808 return;
809 }
810
811 /* get the tos (priority) */
812 sample->dcd_ipTos = *ptr++ & 15;
813
814 if (debug_sflow_parser) {
815 logger << log4cpp::Priority::INFO << plugin_log_prefix << "IPTOS: " << sample->dcd_ipTos;
816 }
817
818 /* 24-bit label */
819 uint32_t label = *ptr++;
820 label <<= 8;
821 label += *ptr++;
822 label <<= 8;
823 label += *ptr++;
824
825 if (debug_sflow_parser) {
826 logger << log4cpp::Priority::INFO << plugin_log_prefix << "IP6_label: " << label;
827 }
828
829 /* payload */
830 uint16_t payloadLen = (ptr[0] << 8) + ptr[1];
831 ptr += 2;
832
833 /* if payload is zero, that implies a jumbo payload */
834 if (debug_sflow_parser) {
835 if (payloadLen == 0) {
836 logger << log4cpp::Priority::INFO << plugin_log_prefix << "IPV6_payloadLen <jumbo>";
837 } else {
838 logger << log4cpp::Priority::INFO << plugin_log_prefix << "IPV6_payloadLen " << payloadLen;
839 }
840 }
841
842 /* next header */
843 uint32_t nextHeader = *ptr++;
844
845 /* TTL */
846 sample->dcd_ipTTL = *ptr++;
847 //sf_log(sample,"IPTTL %u\n", sample->dcd_ipTTL);
848
849 /* src and dst address */
850 // char buf[101];
851 sample->ipsrc.type = SFLADDRESSTYPE_IP_V6;
852 memcpy(&sample->ipsrc.address, ptr, 16);
853 ptr +=16;
854
855 if (debug_sflow_parser) {
856 char buf[101];
857 logger << log4cpp::Priority::INFO << plugin_log_prefix << "srcIP6: " << printAddress(&sample->ipsrc, buf);
858 }
859
860 sample->ipdst.type = SFLADDRESSTYPE_IP_V6;
861 memcpy(&sample->ipdst.address, ptr, 16);
862 ptr +=16;
863
864 if (debug_sflow_parser) {
865 char buf[101];
866 logger << log4cpp::Priority::INFO << plugin_log_prefix << "dstIP6: " << printAddress(&sample->ipdst, buf);
867 }
868
869 /* skip over some common header extensions...
870 http://searchnetworking.techtarget.com/originalContent/0,289142,sid7_gci870277,00.html */
871 while(nextHeader == 0 || /* hop */
872 nextHeader == 43 || /* routing */
873 nextHeader == 44 || /* fragment */
874 /* nextHeader == 50 => encryption - don't bother coz we'll not be able to read any further */
875 nextHeader == 51 || /* auth */
876 nextHeader == 60) { /* destination options */
877
878 uint32_t optionLen, skip;
879
880 if (debug_sflow_parser) {
881 logger << log4cpp::Priority::INFO << plugin_log_prefix << "IP6HeaderExtension: " << nextHeader;
882 }
883
884 nextHeader = ptr[0];
885 optionLen = 8 * (ptr[1] + 1); /* second byte gives option len in 8-byte chunks, not counting first 8 */
886 skip = optionLen - 2;
887 ptr += skip;
888 if (ptr > end) return; /* ran off the end of the header */
889 }
890
891 /* now that we have eliminated the extension headers, nextHeader should have what we want to
892 remember as the ip protocol... */
893 sample->dcd_ipProtocol = nextHeader;
894
895 if (debug_sflow_parser) {
896 logger << log4cpp::Priority::INFO << plugin_log_prefix << "IPProtocol: " << sample->dcd_ipProtocol;
897 }
898
899 decodeIPLayer4(sample, ptr);
900 }
901
decode_ipv4_protocol(SFSample * sample)902 void decode_ipv4_protocol(SFSample* sample) {
903 char buf[51];
904 uint8_t* ptr = sample->header + sample->offsetToIPV4;
905 /* Create a local copy of the IP header (cannot overlay structure in case it is not
906 quad-aligned...some platforms would core-dump if we tried that). It's OK coz this probably performs just as well anyway. */
907 struct myiphdr ip;
908 memcpy(&ip, ptr, sizeof(ip));
909 /* Value copy all ip elements into sample */
910 sample->ipsrc.type = SFLADDRESSTYPE_IP_V4;
911 sample->ipsrc.address.ip_v4.addr = ip.saddr;
912 sample->ipdst.type = SFLADDRESSTYPE_IP_V4;
913 sample->ipdst.address.ip_v4.addr = ip.daddr;
914 sample->dcd_ipProtocol = ip.protocol;
915 sample->dcd_ipTos = ip.tos;
916 sample->dcd_ipTTL = ip.ttl;
917
918 // printf("ip.tot_len %d\n", ntohs(ip.tot_len));
919 /* Log out the decoded IP fields */
920 // printf("srcIP %s\n", printAddress(&sample->ipsrc, buf));
921 // printf("dstIP %s\n", printAddress(&sample->ipdst, buf));
922 // printf("IPProtocol %u\n", sample->dcd_ipProtocol);
923 // printf("IPTOS %u\n", sample->dcd_ipTos);
924 // printf("IPTTL %u\n", sample->dcd_ipTTL);
925
926 /* check for fragments */
927 sample->ip_fragmentOffset = ntohs(ip.frag_off) & 0x1FFF;
928 if (sample->ip_fragmentOffset > 0) {
929 // printf("IPFragmentOffset %u\n", sample->ip_fragmentOffset);
930 } else {
931 /* advance the pointer to the next protocol layer */
932 /* ip headerLen is expressed as a number of quads */
933 ptr += (ip.version_and_headerLen & 0x0f) * 4;
934 decodeIPLayer4(sample, ptr);
935 }
936 }
937