1 #include <iostream>
2 #include <sys/types.h>
3 #include <inttypes.h>
4 
5 #include "sflow_collector.h"
6 
7 // sflowtool-3.32
8 #include "sflow.h"
9 // custom sFLOW data structures
10 #include "sflow_data.h"
11 
12 #include "../fast_library.h"
13 
14 #include <sys/socket.h>
15 #include <netinet/in.h>
16 #include <arpa/inet.h>
17 #include <arpa/inet.h>
18 
19 // UDP server
20 #include <sys/socket.h>
21 #include <netinet/in.h>
22 #include <stdio.h>
23 #include <string.h>
24 #include <setjmp.h>
25 #include <stdlib.h>
26 
27 // log4cpp logging facility
28 #include "log4cpp/Category.hh"
29 #include "log4cpp/Appender.hh"
30 #include "log4cpp/FileAppender.hh"
31 #include "log4cpp/OstreamAppender.hh"
32 #include "log4cpp/Layout.hh"
33 #include "log4cpp/BasicLayout.hh"
34 #include "log4cpp/PatternLayout.hh"
35 #include "log4cpp/Priority.hh"
36 
37 #ifdef ENABLE_LUA_HOOKS
38 lua_State* sflow_lua_state = NULL;
39 
40 bool sflow_lua_hooks_enabled = false;
41 std::string sflow_lua_hooks_path = "/usr/src/fastnetmon/src/sflow_hooks.lua";
42 #endif
43 
44 // Ethertype of outer tag in QinQ
45 uint32_t sflow_qinq_ethertype = 0x8100;
46 
47 // Disable QinQ processing by default
48 bool sflow_qinq_process = false;
49 
50 // sFLOW v4 specification: http://www.sflow.org/rfc3176.txt
51 
52 std::string plugin_name = "sflow";
53 std::string plugin_log_prefix = plugin_name + ": ";
54 
55 // Get logger from main program
56 extern log4cpp::Category& logger;
57 
58 // Global configuration map
59 extern std::map<std::string, std::string> configuration_map;
60 
61 // Enable debug messages in log
62 bool debug_sflow_parser = false;
63 
64 uint32_t getData32(SFSample* sample);
65 bool skipTLVRecord(SFSample* sample, uint32_t tag, uint32_t len);
66 bool readFlowSample(SFSample* sample, int expanded);
67 void readFlowSample_header(SFSample* sample);
68 void decode_ipv4_protocol(SFSample* sample);
69 void decode_ipv6_protocol(SFSample* sample);
70 void print_simple_packet(struct simple_packet& packet);
71 
72 process_packet_pointer sflow_process_func_ptr = NULL;
73 
74 // #include <sys/prctl.h>
75 
76 void start_sflow_collector(std::string interface_for_binding, unsigned int sflow_port);
77 
start_sflow_collection(process_packet_pointer func_ptr)78 void start_sflow_collection(process_packet_pointer func_ptr) {
79     std::string interface_for_binding = "0.0.0.0";
80     std::string sflow_ports = "";
81 
82     logger << log4cpp::Priority::INFO << plugin_log_prefix << "plugin started";
83     // prctl(PR_SET_NAME,"fastnetmon_sflow", 0, 0, 0);
84 
85     sflow_process_func_ptr = func_ptr;
86 
87     if (configuration_map.count("sflow_port") != 0) {
88         sflow_ports = configuration_map["sflow_port"];
89     }
90 
91     if (configuration_map.count("sflow_host") != 0) {
92         interface_for_binding = configuration_map["sflow_host"];
93     }
94 
95     if (configuration_map.count("sflow_qinq_process") != 0) {
96         if (configuration_map["sflow_qinq_process"] == "on") {
97 
98             sflow_qinq_process = true;
99             logger << log4cpp::Priority::INFO << plugin_log_prefix << "qinq processing enabled";
100 
101             if (configuration_map.count("sflow_qinq_ethertype") != 0) {
102                 if (convert_hex_as_string_to_uint(configuration_map["sflow_qinq_ethertype"], sflow_qinq_ethertype)) {
103                     logger << log4cpp::Priority::WARN << plugin_log_prefix
104                            << "can't parse value in sflow_qinq_ethertype variable";
105                     logger << log4cpp::Priority::INFO << plugin_log_prefix
106                            << "disable qinq processing";
107                     sflow_qinq_process = false;
108                 }
109             }
110         }
111     }
112 
113 #ifdef ENABLE_LUA_HOOKS
114     if (configuration_map.count("sflow_lua_hooks_path") != 0) {
115         sflow_lua_hooks_path = configuration_map["sflow_lua_hooks_path"];
116 
117         sflow_lua_hooks_enabled = true;
118     }
119 #endif
120 
121 #ifdef ENABLE_LUA_HOOKS
122     if (sflow_lua_hooks_enabled) {
123         sflow_lua_state = init_lua_jit(sflow_lua_hooks_path);
124 
125         if (sflow_lua_state == NULL) {
126             sflow_lua_hooks_enabled = false;
127         }
128     }
129 #endif
130 
131     boost::thread_group sflow_collector_threads;
132 
133     std::vector<std::string> ports_for_listen;
134     boost::split(ports_for_listen, sflow_ports, boost::is_any_of(","), boost::token_compress_on);
135 
136     logger << log4cpp::Priority::INFO << plugin_log_prefix << "We will listen on " << ports_for_listen.size() << " ports";
137 
138     for (std::vector<std::string>::iterator port = ports_for_listen.begin(); port != ports_for_listen.end(); ++port) {
139         unsigned int sflow_port = convert_string_to_integer(*port);
140 
141         if (sflow_port == 0) {
142             sflow_port = 6343;
143         }
144 
145         sflow_collector_threads.add_thread( new  boost::thread(start_sflow_collector,
146             interface_for_binding,
147             sflow_port
148         ));
149     }
150 
151     sflow_collector_threads.join_all();
152 }
153 
start_sflow_collector(std::string interface_for_binding,unsigned int sflow_port)154 void start_sflow_collector(std::string interface_for_binding, unsigned int sflow_port) {
155 
156     logger << log4cpp::Priority::INFO << plugin_log_prefix << "plugin will listen on " << interface_for_binding
157            << ":" << sflow_port << " udp port";
158 
159     unsigned int udp_buffer_size = 65536;
160     char udp_buffer[udp_buffer_size];
161 
162     int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
163 
164     struct sockaddr_in servaddr;
165     memset(&servaddr, 0, sizeof(servaddr));
166 
167     servaddr.sin_family = AF_INET;
168 
169     if (interface_for_binding == "0.0.0.0") {
170         servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
171     } else {
172         servaddr.sin_addr.s_addr = inet_addr(interface_for_binding.c_str());
173     }
174 
175     servaddr.sin_port = htons(sflow_port);
176     int bind_result = bind(sockfd, (struct sockaddr*)&servaddr, sizeof(servaddr));
177 
178     if (bind_result) {
179         logger << log4cpp::Priority::ERROR << plugin_log_prefix << "can't listen port: " << sflow_port;
180         return;
181     }
182 
183     struct sockaddr_in6 peer;
184     memset(&peer, 0, sizeof(peer));
185 
186     /* We should specify timeout there for correct toolkit shutdown */
187     /* Because otherwise recvfrom will stay in blocked mode forever */
188     struct timeval tv;
189     tv.tv_sec  = 5;  /* X Secs Timeout */
190     tv.tv_usec = 0;  // Not init'ing this can cause strange errors
191 
192     setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(struct timeval));
193 
194     while (true) {
195         struct sockaddr_in cliaddr;
196         socklen_t address_len = sizeof(cliaddr);
197 
198         int received_bytes =
199         recvfrom(sockfd, udp_buffer, udp_buffer_size, 0, (struct sockaddr*)&cliaddr, &address_len);
200 
201         if (received_bytes > 0) {
202             // printf("We receive %d\n", received_bytes);
203 
204             SFSample sample;
205             memset(&sample, 0, sizeof(sample));
206             sample.rawSample = (uint8_t*)udp_buffer;
207             sample.rawSampleLen = received_bytes;
208 
209             if (address_len == sizeof(struct sockaddr_in)) {
210                 struct sockaddr_in* peer4 = (struct sockaddr_in*)&cliaddr;
211                 sample.sourceIP.type = SFLADDRESSTYPE_IP_V4;
212                 memcpy(&sample.sourceIP.address.ip_v4, &peer4->sin_addr, 4);
213 
214                 read_sflow_datagram(&sample);
215             } else {
216                 // We do not support an IPv6
217             }
218         } else {
219             if (received_bytes == -1) {
220 
221                 if (errno == EAGAIN) {
222                     // We got timeout, it's OK!
223                 } else {
224                     logger << log4cpp::Priority::ERROR << plugin_log_prefix << "data receive failed";
225                 }
226             }
227         }
228 
229         // Add interruption point for correct application shutdown
230         boost::this_thread::interruption_point();
231     }
232 }
233 
getData32_nobswap(SFSample * sample)234 uint32_t getData32_nobswap(SFSample* sample) {
235     uint32_t ans = *(sample->datap)++;
236     // make sure we didn't run off the end of the datagram.  Thanks to
237     // Sven Eschenberg for spotting a bug/overrun-vulnerabilty that was here before.
238     if ((uint8_t*)sample->datap > sample->endp) {
239         // SFABORT(sample, SF_ABORT_EOS);
240         // Error!!!
241         logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we tried to read data in bad place! Fault!";
242         return 0;
243     }
244 
245     return ans;
246 }
247 
skipBytes(SFSample * sample,uint32_t skip)248 bool skipBytes(SFSample* sample, uint32_t skip) {
249     int quads = (skip + 3) / 4;
250     sample->datap += quads;
251     if (skip > sample->rawSampleLen || (uint8_t*)sample->datap > sample->endp) {
252         // SFABORT(sample, SF_ABORT_EOS);
253         logger << log4cpp::Priority::ERROR << plugin_log_prefix
254             << "very dangerous error from skipBytes function! We try to read from restricted memory region";
255 
256         return false;
257     }
258 
259     return true;
260 }
261 
getAddress(SFSample * sample,SFLAddress * address)262 uint32_t getAddress(SFSample* sample, SFLAddress* address) {
263     address->type = getData32(sample);
264     if (address->type == SFLADDRESSTYPE_IP_V4) {
265         address->address.ip_v4.addr = getData32_nobswap(sample);
266     } else {
267         memcpy(&address->address.ip_v6.addr, sample->datap, 16);
268         skipBytes(sample, 16);
269     }
270 
271     return address->type;
272 }
273 
getData32(SFSample * sample)274 uint32_t getData32(SFSample* sample) {
275     return ntohl(getData32_nobswap(sample));
276 }
277 
readFlowSample_v2v4(SFSample * sample)278 bool readFlowSample_v2v4(SFSample *sample) {
279     sample->samplesGenerated = getData32(sample);
280 
281     uint32_t samplerId = getData32(sample);
282     sample->ds_class = samplerId >> 24;
283     sample->ds_index = samplerId & 0x00ffffff;
284 
285     sample->meanSkipCount = getData32(sample);
286     sample->samplePool = getData32(sample);
287     sample->dropEvents = getData32(sample);
288     sample->inputPort = getData32(sample);
289     sample->outputPort = getData32(sample);
290 
291     sample->packet_data_tag = getData32(sample);
292 
293     switch(sample->packet_data_tag) {
294 
295         case INMPACKETTYPE_HEADER:
296             readFlowSample_header(sample);
297 
298             break;
299         case INMPACKETTYPE_IPV4:
300             logger << log4cpp::Priority::ERROR << plugin_log_prefix << "hit INMPACKETTYPE_IPV4, very strange";
301             return false;
302 
303             break;
304         case INMPACKETTYPE_IPV6:
305             logger << log4cpp::Priority::ERROR << plugin_log_prefix << "hit INMPACKETTYPE_IPV6, very strange";
306             return false;
307 
308             break;
309         default:
310             logger << log4cpp::Priority::ERROR << plugin_log_prefix << "unexpected packet_data_tag";
311             return false;
312 
313             break;
314     }
315 
316     sample->extended_data_tag = 0;
317 
318     // We should read this data
319     sample->num_extended = getData32(sample);
320 
321     if (sample->num_extended > 0) {
322         logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we have " << sample->num_extended << " extended fields";
323         logger << log4cpp::Priority::ERROR << plugin_log_prefix << "and sorry we haven't support for it :(";
324 
325         return false;
326     }
327 
328     return true;
329 }
330 
read_sflow_datagram(SFSample * sample)331 void read_sflow_datagram(SFSample* sample) {
332     sample->datap = (uint32_t*)sample->rawSample;
333     sample->endp = (uint8_t*)sample->rawSample + sample->rawSampleLen;
334 
335     sample->datagramVersion = getData32(sample);
336     // printf("sFLOW version %d\n", sample->datagramVersion);
337 
338     if (sample->datagramVersion != 5 && sample->datagramVersion != 4) {
339         logger << log4cpp::Priority::ERROR
340                << plugin_log_prefix
341                << "we do not support sFLOW v<< "<< sample->datagramVersion
342                << " because it's too old. Please change version to sFLOW 4 or 5";
343         return;
344     }
345 
346     /* get the agent address */
347     getAddress(sample, &sample->agent_addr);
348 
349     /* version 5 has an agent sub-id as well */
350     if (sample->datagramVersion >= 5) {
351         sample->agentSubId = getData32(sample);
352         // sf_log(sample,"agentSubId %u\n", sample->agentSubId);
353     } else {
354         sample->agentSubId = 0;
355     }
356 
357     sample->sequenceNo = getData32(sample); /* this is the packet sequence number */
358     sample->sysUpTime = getData32(sample);
359     uint32_t samplesInPacket = getData32(sample);
360 
361     // printf("We have %d samples in packet\n", samplesInPacket);
362 
363     uint32_t samp = 0;
364     for (; samp < samplesInPacket; samp++) {
365         if ((uint8_t*)sample->datap >= sample->endp) {
366             logger
367             << log4cpp::Priority::INFO
368             << plugin_log_prefix
369             << "we tried to read data outside packet! It's very dangerous, we stop all operations";
370             return;
371         }
372 
373         // printf("Sample #%d\n", samp);
374 
375         /* just read the tag, then call the approriate decode fn */
376         sample->sampleType = getData32(sample);
377         if (sample->datagramVersion >= 5) {
378             switch (sample->sampleType) {
379             case SFLFLOW_SAMPLE:
380                 // skipBytes(sample, getData32(sample));
381                 if (!readFlowSample(sample, 0)) {
382                     logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we failed in SFLFLOW_SAMPLE handler";
383                     return;
384                 }
385 
386                 break;
387             case SFLCOUNTERS_SAMPLE:
388                 // We do not need counters for our task, skip it
389                 if (!skipBytes(sample, getData32(sample))) {
390                     logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we failed in SFLCOUNTERS_SAMPLE handler";
391                     return;
392                 }
393 
394                 break;
395             case SFLFLOW_SAMPLE_EXPANDED:
396                 // skipBytes(sample, getData32(sample));
397                 if (!readFlowSample(sample, 1)) {
398                     logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we failed in SFLFLOW_SAMPLE_EXPANDED handler";
399                     return;
400                 }
401 
402                 break;
403             case SFLCOUNTERS_SAMPLE_EXPANDED:
404                 // We do not need counters for our task, skip it
405                 if (!skipBytes(sample, getData32(sample))) {
406                     logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we failed in SFLCOUNTERS_SAMPLE_EXPANDED handler";
407                     return;
408                 }
409 
410                 break;
411             default:
412                 if (!skipTLVRecord(sample, sample->sampleType, getData32(sample))) {
413                     logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we failed in default handler in skipTLVRecord";
414                     return;
415                 }
416                 break;
417             }
418         } else {
419             // sFLOW v2 or v4 here
420             switch(sample->sampleType) {
421                 case FLOWSAMPLE:
422                     if (!readFlowSample_v2v4(sample)) {
423                         // We have some troubles with old sFLOW parser
424                         return;
425                     }
426                     break;
427                 case COUNTERSSAMPLE:
428                     logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we haven't support for COUNTERSSAMPLE for "
429                         << "sFLOW v4 and ignore it completely";
430                     return;
431                     break;
432                 default:
433                     logger << log4cpp::Priority::ERROR << plugin_log_prefix << "unexpected sample type: " << sample->sampleType;
434                     return;
435                     break;
436             }
437         }
438     }
439 }
440 
skipTLVRecord(SFSample * sample,uint32_t tag,uint32_t len)441 bool skipTLVRecord(SFSample* sample, uint32_t tag, uint32_t len) {
442     return skipBytes(sample, len);
443 }
444 
445 
length_check(SFSample * sample,const char * description,uint8_t * start,int len)446 bool length_check(SFSample *sample, const char *description, uint8_t *start, int len) {
447     uint32_t actualLen = (uint8_t *)sample->datap - start;
448     uint32_t adjustedLen = ((len + 3) >> 2) << 2;
449 
450     if (actualLen != adjustedLen) {
451         logger << log4cpp::Priority::ERROR << plugin_log_prefix << description
452             << " length error: expected " << len << " found " << actualLen;
453         return false;
454     }
455 
456     return true;
457 }
458 
readFlowSample(SFSample * sample,int expanded)459 bool readFlowSample(SFSample* sample, int expanded) {
460     uint32_t num_elements, sampleLength;
461     uint8_t* sampleStart;
462 
463     sampleLength = getData32(sample);
464     sampleStart = (uint8_t*)sample->datap;
465     sample->samplesGenerated = getData32(sample);
466 
467     if (expanded) {
468         sample->ds_class = getData32(sample);
469         sample->ds_index = getData32(sample);
470     } else {
471         uint32_t samplerId = getData32(sample);
472         sample->ds_class = samplerId >> 24;
473         sample->ds_index = samplerId & 0x00ffffff;
474     }
475 
476     sample->meanSkipCount = getData32(sample);
477     // printf("Sample ratio: %d\n", sample->meanSkipCount);
478     sample->samplePool = getData32(sample);
479     sample->dropEvents = getData32(sample);
480 
481     if (expanded) {
482         sample->inputPortFormat = getData32(sample);
483         sample->inputPort = getData32(sample);
484         sample->outputPortFormat = getData32(sample);
485         sample->outputPort = getData32(sample);
486     } else {
487         uint32_t inp, outp;
488         inp = getData32(sample);
489         outp = getData32(sample);
490         sample->inputPortFormat = inp >> 30;
491         sample->outputPortFormat = outp >> 30;
492         sample->inputPort = inp & 0x3fffffff;
493         sample->outputPort = outp & 0x3fffffff;
494     }
495 
496     num_elements = getData32(sample);
497     uint32_t el;
498     for (el = 0; el < num_elements; el++) {
499         uint32_t tag, length;
500         uint8_t* start;
501         char buf[51];
502         tag = sample->elementType = getData32(sample);
503 
504         length = getData32(sample);
505         start = (uint8_t*)sample->datap;
506 
507         // tag analyze
508         if (tag == SFLFLOW_HEADER) {
509             // process data
510             readFlowSample_header(sample);
511         } else {
512             if (!skipTLVRecord(sample, tag, length)) {
513                 return false;
514             }
515         }
516 
517         if (!length_check(sample, "flow_sample_element", start, length)) {
518             return false;
519         }
520     }
521 
522     if (!length_check(sample, "flow_sample", sampleStart, sampleLength)) {
523         return false;
524     }
525 
526     return true;
527 }
528 
529 #define NFT_ETHHDR_SIZ 14
530 #define NFT_8022_SIZ 3
531 #define NFT_MAX_8023_LEN 1500
532 
533 #define NFT_MIN_SIZ (NFT_ETHHDR_SIZ + sizeof(struct myiphdr))
534 
decode_link_layer(SFSample * sample)535 void decode_link_layer(SFSample* sample) {
536     uint8_t* start = (uint8_t*)sample->header;
537     uint8_t* end = start + sample->headerLen;
538     uint8_t* ptr = start;
539     uint16_t type_len;
540 
541     /* assume not found */
542     sample->gotIPV4 = 0;
543     sample->gotIPV6 = 0;
544 
545     if (sample->headerLen < NFT_ETHHDR_SIZ) {
546         /* not enough for an Ethernet header */
547         return;
548     }
549 
550     // sf_log(sample,"dstMAC %02x%02x%02x%02x%02x%02x\n", ptr[0], ptr[1], ptr[2], ptr[3], ptr[4],
551     // ptr[5]);
552     memcpy(sample->eth_dst, ptr, 6);
553     ptr += 6;
554 
555     // sf_log(sample,"srcMAC %02x%02x%02x%02x%02x%02x\n", ptr[0], ptr[1], ptr[2], ptr[3], ptr[4],
556     // ptr[5]);
557     memcpy(sample->eth_src, ptr, 6);
558     ptr += 6;
559     type_len = (ptr[0] << 8) + ptr[1];
560     ptr += 2;
561 
562     if (sflow_qinq_process && type_len == sflow_qinq_ethertype && ((ptr[2] << 8) + ptr[3]) == 0x8100) {
563         /* Outer VLAN tag - next two bytes */
564         uint32_t vlanData = (ptr[0] << 8) + ptr[1];
565         uint32_t vlan = vlanData & 0x0fff;
566         uint32_t priority = vlanData >> 13;
567         ptr += 2;
568 
569         sample->in_outer_vlan = vlan;
570         /* now get the type_len again (next two bytes) */
571         type_len = (ptr[0] << 8) + ptr[1];
572         ptr += 2;
573     }
574 
575     if (type_len == 0x8100) {
576         /* Inner VLAN tag - next two bytes */
577         uint32_t vlanData = (ptr[0] << 8) + ptr[1];
578         uint32_t vlan = vlanData & 0x0fff;
579         uint32_t priority = vlanData >> 13;
580         ptr += 2;
581 
582         /*  _____________________________________ */
583         /* |   pri  | c |         vlan-id        | */
584         /*  ------------------------------------- */
585         /* [priority = 3bits] [Canonical Format Flag = 1bit] [vlan-id = 12 bits] */
586         // sf_log(sample,"decodedVLAN %u\n", vlan);
587         // sf_log(sample,"decodedPriority %u\n", priority);
588         sample->in_vlan = vlan;
589         /* now get the type_len again (next two bytes) */
590         type_len = (ptr[0] << 8) + ptr[1];
591         ptr += 2;
592     }
593 
594     /* assume type_len is an ethernet-type now */
595     sample->eth_type = type_len;
596 
597     if (type_len == 0x0800) {
598         /* IPV4 */
599         if ((end - ptr) < sizeof(struct myiphdr)) {
600             return;
601         }
602 
603         /* look at first byte of header.... */
604         /*  ___________________________ */
605         /* |   version   |    hdrlen   | */
606         /*  --------------------------- */
607 
608         if ((*ptr >> 4) != 4) return; /* not version 4 */
609         if ((*ptr & 15) < 5) return; /* not IP (hdr len must be 5 quads or more) */
610 
611         /* survived all the tests - store the offset to the start of the ip header */
612         sample->gotIPV4 = 1;
613         sample->offsetToIPV4 = (ptr - start);
614     }
615 
616     if (type_len == 0x86DD) {
617         /* IPV6 */
618         /* look at first byte of header.... */
619 
620         if ((*ptr >> 4) != 6) return; /* not version 6 */
621 
622         /* survived all the tests - store the offset to the start of the ip6 header */
623         sample->gotIPV6 = 1;
624         sample->offsetToIPV6 = (ptr - start);
625 
626         printf("IPv6\n");
627     }
628 
629     // printf("vlan: %d\n",sample->in_vlan);
630 }
631 
readFlowSample_header(SFSample * sample)632 void readFlowSample_header(SFSample* sample) {
633     sample->headerProtocol = getData32(sample);
634     sample->sampledPacketSize = getData32(sample);
635 
636     if (sample->datagramVersion > 4) {
637         /* stripped count introduced in sFlow version 5 */
638         sample->stripped = getData32(sample);
639     }
640 
641     sample->headerLen = getData32(sample);
642     sample->header = (uint8_t*)sample->datap; /* just point at the header */
643     skipBytes(sample, sample->headerLen);
644 
645     if (sample->headerProtocol == SFLHEADER_ETHERNET_ISO8023) {
646         // Detect IPv4 or IPv6 here
647         decode_link_layer(sample);
648 
649         // Process IP packets next
650         if (sample->gotIPV4) {
651             decode_ipv4_protocol(sample);
652         }
653 
654         if (sample->gotIPV6) {
655             decode_ipv6_protocol(sample);
656         }
657     } else {
658         logger << log4cpp::Priority::ERROR << plugin_log_prefix << "not supported protocol: " << sample->headerProtocol;
659         return;
660     }
661 }
662 
IP_to_a(uint32_t ipaddr,char * buf)663 char* IP_to_a(uint32_t ipaddr, char* buf) {
664     uint8_t* ip = (uint8_t*)&ipaddr;
665     /* should really be: snprintf(buf, buflen,...) but snprintf() is not always available */
666     sprintf(buf, "%u.%u.%u.%u", ip[0], ip[1], ip[2], ip[3]);
667     return buf;
668 }
669 
printAddress(SFLAddress * address,char * buf)670 char* printAddress(SFLAddress* address, char* buf) {
671     switch (address->type) {
672     case SFLADDRESSTYPE_IP_V4:
673         IP_to_a(address->address.ip_v4.addr, buf);
674         break;
675     case SFLADDRESSTYPE_IP_V6: {
676         uint8_t* b = address->address.ip_v6.addr;
677         /* should really be: snprintf(buf, buflen,...) but snprintf() is not always available */
678         sprintf(buf, "%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x",
679                 b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7], b[8], b[9], b[10], b[11], b[12],
680                 b[13], b[14], b[15]);
681     } break;
682     default:
683         sprintf(buf, "-");
684     }
685 
686     return buf;
687 }
688 
decodeIPLayer4(SFSample * sample,uint8_t * ptr)689 void decodeIPLayer4(SFSample* sample, uint8_t* ptr) {
690     uint8_t* end = sample->header + sample->headerLen;
691 
692     if (ptr > (end - 8)) {
693         /* not enough header bytes left */
694         return;
695     }
696 
697     simple_packet current_packet;
698 
699     if (sample->gotIPV6) {
700         current_packet.ip_protocol_version = 6;
701 
702         memcpy(current_packet.src_ipv6.s6_addr, sample->ipsrc.address.ip_v6.addr, 16);
703         memcpy(current_packet.dst_ipv6.s6_addr, sample->ipdst.address.ip_v6.addr, 16);
704     } else {
705         current_packet.ip_protocol_version = 4;
706 
707         current_packet.src_ip = sample->ipsrc.address.ip_v4.addr;
708         current_packet.dst_ip = sample->ipdst.address.ip_v4.addr;
709     }
710 
711     // Because sFLOW data is near real time we could get current time
712     gettimeofday(&current_packet.ts, NULL);
713 
714     current_packet.flags = 0;
715     current_packet.number_of_packets = 1;
716     current_packet.length = sample->sampledPacketSize;
717     current_packet.sample_ratio = sample->meanSkipCount;
718 
719     switch (sample->dcd_ipProtocol) {
720     case 1: {
721         // ICMP
722         current_packet.protocol = IPPROTO_ICMP;
723         struct myicmphdr icmp;
724         memcpy(&icmp, ptr, sizeof(icmp));
725         // printf("ICMPType %u\n", icmp.type);
726         // printf("ICMPCode %u\n", icmp.code);
727         sample->dcd_sport = icmp.type;
728         sample->dcd_dport = icmp.code;
729         sample->offsetToPayload = ptr + sizeof(icmp) - sample->header;
730     } break;
731     case 6: {
732         // TCP
733         current_packet.protocol = IPPROTO_TCP;
734         struct mytcphdr tcp;
735         int headerBytes;
736         memcpy(&tcp, ptr, sizeof(tcp));
737         sample->dcd_sport = ntohs(tcp.th_sport);
738         sample->dcd_dport = ntohs(tcp.th_dport);
739 
740         current_packet.source_port = sample->dcd_sport;
741         current_packet.destination_port = sample->dcd_dport;
742         // TODO: flags could be broken because our flags parser implemented with PF_RING style flags
743         // PF_RING
744         current_packet.flags = tcp.th_flags;
745 
746         sample->dcd_tcpFlags = tcp.th_flags;
747         // printf("TCPSrcPort %u\n", sample->dcd_sport);
748         // printf("TCPDstPort %u\n",sample->dcd_dport);
749         // printf("TCPFlags %u\n", sample->dcd_tcpFlags);
750         headerBytes = (tcp.th_off_and_unused >> 4) * 4;
751         ptr += headerBytes;
752         sample->offsetToPayload = ptr - sample->header;
753     } break;
754     case 17: {
755         // UDP
756         current_packet.protocol = IPPROTO_UDP;
757         struct myudphdr udp;
758         memcpy(&udp, ptr, sizeof(udp));
759         sample->dcd_sport = ntohs(udp.uh_sport);
760         sample->dcd_dport = ntohs(udp.uh_dport);
761 
762         current_packet.source_port = sample->dcd_sport;
763         current_packet.destination_port = sample->dcd_dport;
764 
765         sample->udp_pduLen = ntohs(udp.uh_ulen);
766         // printf("UDPSrcPort %u\n", sample->dcd_sport);
767         // printf("UDPDstPort %u\n", sample->dcd_dport);
768         // printf("UDPBytes %u\n", sample->udp_pduLen);
769         sample->offsetToPayload = ptr + sizeof(udp) - sample->header;
770     } break;
771     default: /* some other protcol */
772         sample->offsetToPayload = ptr - sample->header;
773         break;
774     }
775 
776 #ifdef ENABLE_LUA_HOOKS
777     //sample->inputPort  = fast_ntoh(sample->inputPort);
778     //sample->outputPort = fast_ntoh(sample->outputPort);
779 
780     if (sflow_lua_hooks_enabled) {
781         // This code could be used only for tests with pcap_reader
782         if (sflow_lua_state == NULL) {
783             sflow_lua_state = init_lua_jit(sflow_lua_hooks_path);
784         }
785 
786         if (call_lua_function("process_sflow", sflow_lua_state,
787             convert_ip_as_uint_to_string(sample->sourceIP.address.ip_v4.addr), (void*)sample)) {
788             // We will process this packet
789         } else {
790             logger << log4cpp::Priority::DEBUG << "We will drop this packets because LUA script decided to do it";
791             return;
792         }
793     }
794 #endif
795 
796     // Call external handler function
797     sflow_process_func_ptr(current_packet);
798 }
799 
decode_ipv6_protocol(SFSample * sample)800 void decode_ipv6_protocol(SFSample* sample) {
801     uint8_t *ptr = sample->header + sample->offsetToIPV6;
802     uint8_t *end = sample->header + sample->headerLen;
803 
804     int ipVersion = (*ptr >> 4);
805 
806     if (ipVersion != 6) {
807         logger << log4cpp::Priority::ERROR << plugin_log_prefix << "sFLOW header decode error: unexpected IP version: " << ipVersion;
808         return;
809     }
810 
811     /* get the tos (priority) */
812     sample->dcd_ipTos = *ptr++ & 15;
813 
814     if (debug_sflow_parser) {
815         logger << log4cpp::Priority::INFO << plugin_log_prefix << "IPTOS: " << sample->dcd_ipTos;
816     }
817 
818     /* 24-bit label */
819     uint32_t label = *ptr++;
820     label <<= 8;
821     label += *ptr++;
822     label <<= 8;
823     label += *ptr++;
824 
825     if (debug_sflow_parser) {
826         logger << log4cpp::Priority::INFO << plugin_log_prefix << "IP6_label: " << label;
827     }
828 
829     /* payload */
830     uint16_t payloadLen = (ptr[0] << 8) + ptr[1];
831     ptr += 2;
832 
833     /* if payload is zero, that implies a jumbo payload */
834     if (debug_sflow_parser) {
835         if (payloadLen == 0) {
836             logger << log4cpp::Priority::INFO << plugin_log_prefix << "IPV6_payloadLen <jumbo>";
837         } else {
838             logger << log4cpp::Priority::INFO << plugin_log_prefix << "IPV6_payloadLen " << payloadLen;
839         }
840     }
841 
842     /* next header */
843     uint32_t nextHeader = *ptr++;
844 
845     /* TTL */
846     sample->dcd_ipTTL = *ptr++;
847     //sf_log(sample,"IPTTL %u\n", sample->dcd_ipTTL);
848 
849     /* src and dst address */
850     // char buf[101];
851     sample->ipsrc.type = SFLADDRESSTYPE_IP_V6;
852     memcpy(&sample->ipsrc.address, ptr, 16);
853     ptr +=16;
854 
855     if (debug_sflow_parser) {
856         char buf[101];
857         logger << log4cpp::Priority::INFO << plugin_log_prefix << "srcIP6: " << printAddress(&sample->ipsrc, buf);
858     }
859 
860     sample->ipdst.type = SFLADDRESSTYPE_IP_V6;
861     memcpy(&sample->ipdst.address, ptr, 16);
862     ptr +=16;
863 
864     if (debug_sflow_parser) {
865         char buf[101];
866         logger << log4cpp::Priority::INFO << plugin_log_prefix << "dstIP6: " << printAddress(&sample->ipdst, buf);
867     }
868 
869     /* skip over some common header extensions...
870        http://searchnetworking.techtarget.com/originalContent/0,289142,sid7_gci870277,00.html */
871     while(nextHeader == 0 ||  /* hop */
872           nextHeader == 43 || /* routing */
873           nextHeader == 44 || /* fragment */
874           /* nextHeader == 50 => encryption - don't bother coz we'll not be able to read any further */
875           nextHeader == 51 || /* auth */
876           nextHeader == 60) { /* destination options */
877 
878         uint32_t optionLen, skip;
879 
880         if (debug_sflow_parser) {
881             logger << log4cpp::Priority::INFO << plugin_log_prefix << "IP6HeaderExtension: " << nextHeader;
882         }
883 
884         nextHeader = ptr[0];
885         optionLen = 8 * (ptr[1] + 1);  /* second byte gives option len in 8-byte chunks, not counting first 8 */
886         skip = optionLen - 2;
887         ptr += skip;
888         if (ptr > end) return; /* ran off the end of the header */
889     }
890 
891     /* now that we have eliminated the extension headers, nextHeader should have what we want to
892        remember as the ip protocol... */
893     sample->dcd_ipProtocol = nextHeader;
894 
895     if (debug_sflow_parser) {
896         logger << log4cpp::Priority::INFO << plugin_log_prefix << "IPProtocol: " << sample->dcd_ipProtocol;
897     }
898 
899     decodeIPLayer4(sample, ptr);
900 }
901 
decode_ipv4_protocol(SFSample * sample)902 void decode_ipv4_protocol(SFSample* sample) {
903     char buf[51];
904     uint8_t* ptr = sample->header + sample->offsetToIPV4;
905     /* Create a local copy of the IP header (cannot overlay structure in case it is not
906         quad-aligned...some platforms would core-dump if we tried that).  It's OK coz this probably performs just as well anyway. */
907     struct myiphdr ip;
908     memcpy(&ip, ptr, sizeof(ip));
909     /* Value copy all ip elements into sample */
910     sample->ipsrc.type = SFLADDRESSTYPE_IP_V4;
911     sample->ipsrc.address.ip_v4.addr = ip.saddr;
912     sample->ipdst.type = SFLADDRESSTYPE_IP_V4;
913     sample->ipdst.address.ip_v4.addr = ip.daddr;
914     sample->dcd_ipProtocol = ip.protocol;
915     sample->dcd_ipTos = ip.tos;
916     sample->dcd_ipTTL = ip.ttl;
917 
918     // printf("ip.tot_len %d\n", ntohs(ip.tot_len));
919     /* Log out the decoded IP fields */
920     // printf("srcIP %s\n", printAddress(&sample->ipsrc, buf));
921     // printf("dstIP %s\n", printAddress(&sample->ipdst, buf));
922     // printf("IPProtocol %u\n", sample->dcd_ipProtocol);
923     // printf("IPTOS %u\n", sample->dcd_ipTos);
924     // printf("IPTTL %u\n", sample->dcd_ipTTL);
925 
926     /* check for fragments */
927     sample->ip_fragmentOffset = ntohs(ip.frag_off) & 0x1FFF;
928     if (sample->ip_fragmentOffset > 0) {
929         // printf("IPFragmentOffset %u\n", sample->ip_fragmentOffset);
930     } else {
931         /* advance the pointer to the next protocol layer */
932         /* ip headerLen is expressed as a number of quads */
933         ptr += (ip.version_and_headerLen & 0x0f) * 4;
934         decodeIPLayer4(sample, ptr);
935     }
936 }
937