1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /* Copyright (c) 2002-2014 InMon Corp. Licensed under the terms of the InMon sFlow licence: */
3 /* http://www.inmon.com/technology/sflowlicense.txt */
4 
5 #include "ngx_http_sflow_api.h"
6 
7 /* ===================================================*/
8 /* ===================== AGENT =======================*/
9 
10 
11 static void sfl_agent_jumpTableAdd(SFLAgent *agent, SFLSampler *sampler);
12 static void sfl_agent_jumpTableRemove(SFLAgent *agent, SFLSampler *sampler);
13 
14 /*_________________---------------------------__________________
15   _________________       error logging       __________________
16   -----------------___________________________------------------
17 */
18 #define MAX_ERRMSG_LEN 1000
19 
sfl_agent_error(SFLAgent * agent,char * modName,char * msg)20 void sfl_agent_error(SFLAgent *agent, char *modName, char *msg)
21 {
22     char errm[MAX_ERRMSG_LEN];
23     snprintf(errm, MAX_ERRMSG_LEN, "sfl_agent_error: %s: %s\n", modName, msg);
24     if(agent->errorFn) (*agent->errorFn)(agent->magic, agent, errm);
25 }
26 
27 /*________________--------------------------__________________
28   ________________    sfl_agent_init        __________________
29   ----------------__________________________------------------
30 */
31 
sfl_agent_init(SFLAgent * agent,SFLAddress * myIP,uint32_t subId,time_t bootTime,time_t now,void * magic,errorFn_t errorFn,sendFn_t sendFn)32 void sfl_agent_init(SFLAgent *agent,
33                     SFLAddress *myIP, /* IP address of this agent in net byte order */
34                     uint32_t subId,  /* agent_sub_id */
35                     time_t bootTime,  /* agent boot time */
36                     time_t now,       /* time now */
37                     void *magic,      /* ptr to pass back in logging and alloc fns */
38                     errorFn_t errorFn,
39                     sendFn_t sendFn)
40 {
41     /* first clear everything */
42     memset(agent, 0, sizeof(*agent));
43     /* now copy in the parameters */
44     agent->myIP = *myIP; /* structure copy */
45     agent->subId = subId;
46     agent->bootTime = bootTime;
47     agent->now = now;
48     agent->magic = magic;
49     agent->errorFn = errorFn;
50     agent->sendFn = sendFn;
51 }
52 
53 /*_________________---------------------------__________________
54   _________________   sfl_agent_tick          __________________
55   -----------------___________________________------------------
56 */
57 
sfl_agent_tick(SFLAgent * agent,time_t now)58 void sfl_agent_tick(SFLAgent *agent, time_t now)
59 {
60     SFLReceiver *rcv;
61     SFLSampler *sm;
62     SFLPoller *pl;
63 
64     agent->now = now;
65     /* receivers use ticks to flush send data */
66     for( rcv = agent->receivers; rcv != NULL; rcv = rcv->nxt) sfl_receiver_tick(rcv, now);
67     /* samplers use ticks to decide when they are sampling too fast */
68     for( sm = agent->samplers; sm != NULL; sm = sm->nxt) sfl_sampler_tick(sm, now);
69     /* pollers use ticks to decide when to ask for counters */
70     for( pl = agent->pollers; pl != NULL; pl = pl->nxt) sfl_poller_tick(pl, now);
71 }
72 
73 /*_________________---------------------------__________________
74   _________________   sfl_agent_addReceiver   __________________
75   -----------------___________________________------------------
76 */
77 
sfl_agent_addReceiver(SFLAgent * agent,SFLReceiver * rcv)78 SFLReceiver *sfl_agent_addReceiver(SFLAgent *agent, SFLReceiver *rcv)
79 {
80     SFLReceiver *r, *prev;
81 
82     prev = NULL;
83     sfl_receiver_init(rcv, agent);
84     /* add to end of list - to preserve the receiver index numbers for existing receivers */
85 
86     for(r = agent->receivers; r != NULL; prev = r, r = r->nxt);
87     if(prev) prev->nxt = rcv;
88     else agent->receivers = rcv;
89     rcv->nxt = NULL;
90     return rcv;
91 }
92 
93 /*_________________---------------------------__________________
94   _________________     sfl_dsi_compare       __________________
95   -----------------___________________________------------------
96 
97   Note that if there is a mixture of ds_classes for this agent, then
98   the simple numeric comparison may not be correct - the sort order (for
99   the purposes of the SNMP MIB) should really be determined by the OID
100   that these numeric ds_class numbers are a shorthand for.  For example,
101   ds_class == 0 means ifIndex, which is the oid "1.3.6.1.2.1.2.2.1"
102 */
103 
sfl_dsi_compare(SFLDataSource_instance * pdsi1,SFLDataSource_instance * pdsi2)104 static int sfl_dsi_compare(SFLDataSource_instance *pdsi1, SFLDataSource_instance *pdsi2) {
105     /* could have used just memcmp(),  but not sure if that would */
106     /* give the right answer on little-endian platforms. Safer to be explicit... */
107     int cmp = pdsi2->ds_class - pdsi1->ds_class;
108     if(cmp == 0) cmp = pdsi2->ds_index - pdsi1->ds_index;
109     if(cmp == 0) cmp = pdsi2->ds_instance - pdsi1->ds_instance;
110     return cmp;
111 }
112 
113 /*_________________---------------------------__________________
114   _________________   sfl_agent_addSampler    __________________
115   -----------------___________________________------------------
116 */
117 
sfl_agent_addSampler(SFLAgent * agent,SFLDataSource_instance * pdsi,SFLSampler * newsm)118 SFLSampler *sfl_agent_addSampler(SFLAgent *agent,
119                                  SFLDataSource_instance *pdsi,
120                                  SFLSampler *newsm)
121 {
122     SFLSampler *prev, *sm, *test;
123 
124     prev = NULL;
125     sm = agent->samplers;
126     /* keep the list sorted */
127     for(; sm != NULL; prev = sm, sm = sm->nxt) {
128         int64_t cmp = sfl_dsi_compare(pdsi, &sm->dsi);
129         if(cmp == 0) return sm;  /* found - return existing one */
130         if(cmp < 0) break;       /* insert here */
131     }
132     /* either we found the insert point, or reached the end of the list... */
133     sfl_sampler_init(newsm, agent, pdsi);
134     if(prev) prev->nxt = newsm;
135     else agent->samplers = newsm;
136     newsm->nxt = sm;
137 
138     /* see if we should go in the ifIndex jumpTable */
139     if(SFL_DS_CLASS(newsm->dsi) == 0) {
140         test = sfl_agent_getSamplerByIfIndex(agent, SFL_DS_INDEX(newsm->dsi));
141         if(test && (SFL_DS_INSTANCE(newsm->dsi) < SFL_DS_INSTANCE(test->dsi))) {
142             /* replace with this new one because it has a lower ds_instance number */
143             sfl_agent_jumpTableRemove(agent, test);
144             test = NULL;
145         }
146         if(test == NULL) sfl_agent_jumpTableAdd(agent, newsm);
147     }
148     return newsm;
149 }
150 
151 /*_________________---------------------------__________________
152   _________________   sfl_agent_addPoller     __________________
153   -----------------___________________________------------------
154 */
155 
sfl_agent_addPoller(SFLAgent * agent,SFLDataSource_instance * pdsi,void * magic,getCountersFn_t getCountersFn,SFLPoller * newpl)156 SFLPoller *sfl_agent_addPoller(SFLAgent *agent,
157                                SFLDataSource_instance *pdsi,
158                                void *magic,         /* ptr to pass back in getCountersFn() */
159                                getCountersFn_t getCountersFn,
160                                SFLPoller *newpl)
161 {
162     /* keep the list sorted */
163     SFLPoller *prev = NULL, *pl = agent->pollers;
164     for(; pl != NULL; prev = pl, pl = pl->nxt) {
165         int64_t cmp = sfl_dsi_compare(pdsi, &pl->dsi);
166         if(cmp == 0) return pl;  /* found - return existing one */
167         if(cmp < 0) break;       /* insert here */
168     }
169     /* either we found the insert point, or reached the end of the list... */
170     sfl_poller_init(newpl, agent, pdsi, magic, getCountersFn);
171     if(prev) prev->nxt = newpl;
172     else agent->pollers = newpl;
173     newpl->nxt = pl;
174     return newpl;
175 }
176 
177 /*_________________--------------------------------__________________
178   _________________  sfl_agent_jumpTableAdd        __________________
179   -----------------________________________________------------------
180 */
181 
sfl_agent_jumpTableAdd(SFLAgent * agent,SFLSampler * sampler)182 static void sfl_agent_jumpTableAdd(SFLAgent *agent, SFLSampler *sampler)
183 {
184     uint32_t hashIndex = SFL_DS_INDEX(sampler->dsi) % SFL_HASHTABLE_SIZ;
185     sampler->hash_nxt = agent->jumpTable[hashIndex];
186     agent->jumpTable[hashIndex] = sampler;
187 }
188 
189 /*_________________--------------------------------__________________
190   _________________  sfl_agent_jumpTableRemove     __________________
191   -----------------________________________________------------------
192 */
193 
sfl_agent_jumpTableRemove(SFLAgent * agent,SFLSampler * sampler)194 static void sfl_agent_jumpTableRemove(SFLAgent *agent, SFLSampler *sampler)
195 {
196     uint32_t hashIndex = SFL_DS_INDEX(sampler->dsi) % SFL_HASHTABLE_SIZ;
197     SFLSampler *search = agent->jumpTable[hashIndex], *prev = NULL;
198     for( ; search != NULL; prev = search, search = search->hash_nxt) if(search == sampler) break;
199     if(search) {
200         /* found - unlink */
201         if(prev) prev->hash_nxt = search->hash_nxt;
202         else agent->jumpTable[hashIndex] = search->hash_nxt;
203         search->hash_nxt = NULL;
204     }
205 }
206 
207 /*_________________--------------------------------__________________
208   _________________  sfl_agent_getSamplerByIfIndex __________________
209   -----------------________________________________------------------
210   fast lookup (pointers cached in hash table).  If there are multiple
211   sampler instances for a given ifIndex, then this fn will return
212   the one with the lowest instance number.  Since the samplers
213   list is sorted, this means the other instances will be accesible
214   by following the sampler->nxt pointer (until the ds_class
215   or ds_index changes).  This is helpful if you need to offer
216   the same flowSample to multiple samplers.
217 */
218 
sfl_agent_getSamplerByIfIndex(SFLAgent * agent,uint32_t ifIndex)219 SFLSampler *sfl_agent_getSamplerByIfIndex(SFLAgent *agent, uint32_t ifIndex)
220 {
221     SFLSampler *search = agent->jumpTable[ifIndex % SFL_HASHTABLE_SIZ];
222     for( ; search != NULL; search = search->hash_nxt) if(SFL_DS_INDEX(search->dsi) == ifIndex) break;
223     return search;
224 }
225 
226 /*_________________---------------------------__________________
227   _________________  sfl_agent_getSampler     __________________
228   -----------------___________________________------------------
229 */
230 
sfl_agent_getSampler(SFLAgent * agent,SFLDataSource_instance * pdsi)231 SFLSampler *sfl_agent_getSampler(SFLAgent *agent, SFLDataSource_instance *pdsi)
232 {
233     SFLSampler *sm;
234 
235     /* find it and return it */
236     for( sm = agent->samplers; sm != NULL; sm = sm->nxt)
237         if(sfl_dsi_compare(pdsi, &sm->dsi) == 0) return sm;
238     /* not found */
239     return NULL;
240 }
241 
242 /*_________________---------------------------__________________
243   _________________  sfl_agent_getPoller      __________________
244   -----------------___________________________------------------
245 */
246 
sfl_agent_getPoller(SFLAgent * agent,SFLDataSource_instance * pdsi)247 SFLPoller *sfl_agent_getPoller(SFLAgent *agent, SFLDataSource_instance *pdsi)
248 {
249     SFLPoller *pl;
250 
251     /* find it and return it */
252     for( pl = agent->pollers; pl != NULL; pl = pl->nxt)
253         if(sfl_dsi_compare(pdsi, &pl->dsi) == 0) return pl;
254     /* not found */
255     return NULL;
256 }
257 
258 /*_________________---------------------------__________________
259   _________________  sfl_agent_getReceiver    __________________
260   -----------------___________________________------------------
261 */
262 
sfl_agent_getReceiver(SFLAgent * agent,uint32_t receiverIndex)263 SFLReceiver *sfl_agent_getReceiver(SFLAgent *agent, uint32_t receiverIndex)
264 {
265     SFLReceiver *rcv;
266 
267     uint32_t rcvIdx = 0;
268     for( rcv = agent->receivers; rcv != NULL; rcv = rcv->nxt)
269         if(receiverIndex == ++rcvIdx) return rcv;
270 
271     /* not found - ran off the end of the table */
272     return NULL;
273 }
274 
275 /*_________________---------------------------__________________
276   _________________ sfl_agent_getNextSampler  __________________
277   -----------------___________________________------------------
278 */
279 
sfl_agent_getNextSampler(SFLAgent * agent,SFLDataSource_instance * pdsi)280 SFLSampler *sfl_agent_getNextSampler(SFLAgent *agent, SFLDataSource_instance *pdsi)
281 {
282     /* return the one lexograpically just after it - assume they are sorted
283        correctly according to the lexographical ordering of the object ids */
284     SFLSampler *sm = sfl_agent_getSampler(agent, pdsi);
285     return sm ? sm->nxt : NULL;
286 }
287 
288 /*_________________---------------------------__________________
289   _________________ sfl_agent_getNextPoller   __________________
290   -----------------___________________________------------------
291 */
292 
sfl_agent_getNextPoller(SFLAgent * agent,SFLDataSource_instance * pdsi)293 SFLPoller *sfl_agent_getNextPoller(SFLAgent *agent, SFLDataSource_instance *pdsi)
294 {
295     /* return the one lexograpically just after it - assume they are sorted
296        correctly according to the lexographical ordering of the object ids */
297     SFLPoller *pl = sfl_agent_getPoller(agent, pdsi);
298     return pl ? pl->nxt : NULL;
299 }
300 
301 /*_________________---------------------------__________________
302   _________________ sfl_agent_getNextReceiver __________________
303   -----------------___________________________------------------
304 */
305 
sfl_agent_getNextReceiver(SFLAgent * agent,uint32_t receiverIndex)306 SFLReceiver *sfl_agent_getNextReceiver(SFLAgent *agent, uint32_t receiverIndex)
307 {
308     return sfl_agent_getReceiver(agent, receiverIndex + 1);
309 }
310 
311 
312 /*_________________---------------------------__________________
313   _________________ sfl_agent_resetReceiver   __________________
314   -----------------___________________________------------------
315 */
316 
sfl_agent_resetReceiver(SFLAgent * agent,SFLReceiver * receiver)317 void sfl_agent_resetReceiver(SFLAgent *agent, SFLReceiver *receiver)
318 {
319     SFLReceiver *rcv;
320     SFLSampler *sm;
321     SFLPoller *pl;
322 
323     /* tell samplers and pollers to stop sending to this receiver */
324     /* first get his receiverIndex */
325     uint32_t rcvIdx = 0;
326     for( rcv = agent->receivers; rcv != NULL; rcv = rcv->nxt) {
327         rcvIdx++; /* thanks to Diego Valverde for pointing out this bugfix */
328         if(rcv == receiver) {
329             /* now tell anyone that is using it to stop */
330             for( sm = agent->samplers; sm != NULL; sm = sm->nxt)
331                 if(sfl_sampler_get_sFlowFsReceiver(sm) == rcvIdx) sfl_sampler_set_sFlowFsReceiver(sm, 0);
332 
333             for( pl = agent->pollers; pl != NULL; pl = pl->nxt)
334                 if(sfl_poller_get_sFlowCpReceiver(pl) == rcvIdx) sfl_poller_set_sFlowCpReceiver(pl, 0);
335 
336             break;
337         }
338     }
339 }
340 
341 
342 
343 
344 /* ===================================================*/
345 /* ===================== SAMPLER =====================*/
346 
347 /*_________________--------------------------__________________
348   _________________   sfl_sampler_init       __________________
349   -----------------__________________________------------------
350 */
351 
sfl_sampler_init(SFLSampler * sampler,SFLAgent * agent,SFLDataSource_instance * pdsi)352 void sfl_sampler_init(SFLSampler *sampler, SFLAgent *agent, SFLDataSource_instance *pdsi)
353 {
354     /* copy the dsi in case it points to sampler->dsi, which we are about to clear.
355        (Thanks to Jagjit Choudray of Force 10 Networks for pointing out this bug) */
356     SFLDataSource_instance dsi = *pdsi;
357 
358     /* preserve the *nxt pointer too, in case we are resetting this poller and it is
359        already part of the agent's linked list (thanks to Matt Woodly for pointing this out) */
360     SFLSampler *nxtPtr = sampler->nxt;
361 
362     /* clear everything */
363     memset(sampler, 0, sizeof(*sampler));
364 
365     /* restore the linked list ptr */
366     sampler->nxt = nxtPtr;
367 
368     /* now copy in the parameters */
369     sampler->agent = agent;
370     sampler->dsi = dsi;
371 
372     /* set defaults */
373     sfl_sampler_set_sFlowFsMaximumHeaderSize(sampler, SFL_DEFAULT_HEADER_SIZE);
374     sfl_sampler_set_sFlowFsPacketSamplingRate(sampler, SFL_DEFAULT_SAMPLING_RATE);
375 }
376 
377 /*_________________--------------------------__________________
378   _________________       reset              __________________
379   -----------------__________________________------------------
380 */
381 
resetSampler(SFLSampler * sampler)382 static void resetSampler(SFLSampler *sampler)
383 {
384     SFLDataSource_instance dsi = sampler->dsi;
385     sfl_sampler_init(sampler, sampler->agent, &dsi);
386 }
387 
388 /*_________________---------------------------__________________
389   _________________      MIB access           __________________
390   -----------------___________________________------------------
391 */
sfl_sampler_get_sFlowFsReceiver(SFLSampler * sampler)392 uint32_t sfl_sampler_get_sFlowFsReceiver(SFLSampler *sampler) {
393     return sampler->sFlowFsReceiver;
394 }
395 
sfl_sampler_set_sFlowFsReceiver(SFLSampler * sampler,uint32_t sFlowFsReceiver)396 void sfl_sampler_set_sFlowFsReceiver(SFLSampler *sampler, uint32_t sFlowFsReceiver) {
397     sampler->sFlowFsReceiver = sFlowFsReceiver;
398     if(sFlowFsReceiver == 0) resetSampler(sampler);
399     else {
400         /* retrieve and cache a direct pointer to my receiver */
401         sampler->myReceiver = sfl_agent_getReceiver(sampler->agent, sampler->sFlowFsReceiver);
402     }
403 }
404 
sfl_sampler_get_sFlowFsPacketSamplingRate(SFLSampler * sampler)405 uint32_t sfl_sampler_get_sFlowFsPacketSamplingRate(SFLSampler *sampler) {
406     return sampler->sFlowFsPacketSamplingRate;
407 }
408 
sfl_sampler_set_sFlowFsPacketSamplingRate(SFLSampler * sampler,uint32_t sFlowFsPacketSamplingRate)409 void sfl_sampler_set_sFlowFsPacketSamplingRate(SFLSampler *sampler, uint32_t sFlowFsPacketSamplingRate) {
410     sampler->sFlowFsPacketSamplingRate = sFlowFsPacketSamplingRate;
411     /* initialize the skip count too */
412     sampler->skip = sFlowFsPacketSamplingRate ? sfl_random(sFlowFsPacketSamplingRate) : 0;
413 }
414 
sfl_sampler_get_sFlowFsMaximumHeaderSize(SFLSampler * sampler)415 uint32_t sfl_sampler_get_sFlowFsMaximumHeaderSize(SFLSampler *sampler) {
416     return sampler->sFlowFsMaximumHeaderSize;
417 }
418 
sfl_sampler_set_sFlowFsMaximumHeaderSize(SFLSampler * sampler,uint32_t sFlowFsMaximumHeaderSize)419 void sfl_sampler_set_sFlowFsMaximumHeaderSize(SFLSampler *sampler, uint32_t sFlowFsMaximumHeaderSize) {
420     sampler->sFlowFsMaximumHeaderSize = sFlowFsMaximumHeaderSize;
421 }
422 
423 /* call this to set a maximum samples-per-second threshold. If the sampler reaches this
424    threshold it will automatically back off the sampling rate. A value of 0 disables the
425    mechanism */
426 
sfl_sampler_set_backoffThreshold(SFLSampler * sampler,uint32_t samplesPerSecond)427 void sfl_sampler_set_backoffThreshold(SFLSampler *sampler, uint32_t samplesPerSecond) {
428     sampler->backoffThreshold = samplesPerSecond;
429 }
430 
sfl_sampler_get_backoffThreshold(SFLSampler * sampler)431 uint32_t sfl_sampler_get_backoffThreshold(SFLSampler *sampler) {
432     return sampler->backoffThreshold;
433 }
434 
sfl_sampler_get_samplesLastTick(SFLSampler * sampler)435 uint32_t sfl_sampler_get_samplesLastTick(SFLSampler *sampler) {
436     return sampler->samplesLastTick;
437 }
438 
439 /*_________________---------------------------------__________________
440   _________________   sequence number reset         __________________
441   -----------------_________________________________------------------
442   Used by the agent to indicate a samplePool discontinuity
443   so that the sflow collector will know to ignore the next delta.
444 */
sfl_sampler_resetFlowSeqNo(SFLSampler * sampler)445 void sfl_sampler_resetFlowSeqNo(SFLSampler *sampler) { sampler->flowSampleSeqNo = 0; }
446 
447 
448 /*_________________---------------------------__________________
449   _________________    sfl_sampler_tick       __________________
450   -----------------___________________________------------------
451 */
452 
sfl_sampler_tick(SFLSampler * sampler,time_t now)453 void sfl_sampler_tick(SFLSampler *sampler, time_t now)
454 {
455     if(sampler->backoffThreshold && sampler->samplesThisTick > sampler->backoffThreshold) {
456         /* automatic backoff.  If using hardware sampling then this is where you have to */
457         /* call out to change the sampling rate and make sure that any other registers/variables */
458         /* that hold this value are updated. */
459         sampler->sFlowFsPacketSamplingRate *= 2;
460     }
461     sampler->samplesLastTick = sampler->samplesThisTick;
462     sampler->samplesThisTick = 0;
463 }
464 
465 
466 
467 /*_________________------------------------------__________________
468   _________________ sfl_sampler_writeFlowSample  __________________
469   -----------------______________________________------------------
470 */
471 
sfl_sampler_writeFlowSample(SFLSampler * sampler,SFL_FLOW_SAMPLE_TYPE * fs)472 void sfl_sampler_writeFlowSample(SFLSampler *sampler, SFL_FLOW_SAMPLE_TYPE *fs)
473 {
474     if(fs == NULL) return;
475     sampler->samplesThisTick++;
476     /* increment the sequence number */
477     fs->sequence_number = ++sampler->flowSampleSeqNo;
478     /* copy the other header fields in */
479 #ifdef SFL_USE_32BIT_INDEX
480     fs->ds_class = SFL_DS_CLASS(sampler->dsi);
481     fs->ds_index = SFL_DS_INDEX(sampler->dsi);
482 #else
483     fs->source_id = SFL_DS_DATASOURCE(sampler->dsi);
484 #endif
485     /* the sampling rate may have been set already. */
486     if(fs->sampling_rate == 0) fs->sampling_rate = sampler->sFlowFsPacketSamplingRate;
487     /* the samplePool may be maintained upstream too. */
488     if(fs->sample_pool == 0) fs->sample_pool = sampler->samplePool;
489     /* and the same for the drop event counter */
490     if(fs->drops == 0) fs->drops = sampler->dropEvents;
491     /* sent to my receiver */
492     if(sampler->myReceiver) sfl_receiver_writeFlowSample(sampler->myReceiver, fs);
493 }
494 
495 /*_________________-------------------------------------__________________
496   _________________ sfl_sampler_writeEncodedFlowSample  __________________
497   -----------------_____________________________________------------------
498 */
499 
sfl_sampler_writeEncodedFlowSample(SFLSampler * sampler,char * xdrBytes,uint32_t len)500 void sfl_sampler_writeEncodedFlowSample(SFLSampler *sampler, char *xdrBytes, uint32_t len)
501 {
502     SFL_FLOW_SAMPLE_TYPE fs;
503     memset(&fs, 0, sizeof(fs));
504     sampler->samplesThisTick++;
505     /* increment the sequence number */
506     fs.sequence_number = ++sampler->flowSampleSeqNo;
507     /* copy the other header fields in */
508 #ifdef SFL_USE_32BIT_INDEX
509     fs.ds_class = SFL_DS_CLASS(sampler->dsi);
510     fs.ds_index = SFL_DS_INDEX(sampler->dsi);
511 #else
512     fs.source_id = SFL_DS_DATASOURCE(sampler->dsi);
513 #endif
514     fs.sampling_rate = sampler->sFlowFsPacketSamplingRate;
515     fs.sample_pool = sampler->samplePool;
516     fs.drops = sampler->dropEvents;
517     if(sampler->myReceiver) sfl_receiver_writeEncodedFlowSample(sampler->myReceiver, &fs, xdrBytes, len);
518 }
519 
520 /*_________________---------------------------__________________
521   _________________     sfl_random            __________________
522   -----------------___________________________------------------
523   Gerhard's generator
524 */
525 
526 static uint32_t SFLRandom = 1;
527 
sfl_random(uint32_t lim)528 uint32_t sfl_random(uint32_t lim) {
529     SFLRandom = ((SFLRandom * 32719) + 3) % 32749;
530     return ((SFLRandom % lim) + 1);
531 }
532 
sfl_random_init(uint32_t seed)533 void sfl_random_init(uint32_t seed) {
534     SFLRandom = seed;
535 }
536 
sfl_sampler_next_skip(SFLSampler * sampler)537 uint32_t sfl_sampler_next_skip(SFLSampler *sampler) {
538     return sfl_random((2 * sampler->sFlowFsPacketSamplingRate) - 1);
539 }
540 
541 /*_________________---------------------------__________________
542   _________________  sfl_sampler_takeSample   __________________
543   -----------------___________________________------------------
544 */
545 
sfl_sampler_takeSample(SFLSampler * sampler)546 int sfl_sampler_takeSample(SFLSampler *sampler)
547 {
548     /* increment the samplePool */
549     sampler->samplePool++;
550 
551     if(unlikely(--sampler->skip == 0)) {
552         /* reached zero. Set the next skip and return true. */
553         sampler->skip = sfl_sampler_next_skip(sampler);
554         return 1;
555     }
556     return 0;
557 }
558 
559 
560 
561 /* ===================================================*/
562 /* ===================== POLLER ======================*/
563 
564 /*_________________--------------------------__________________
565   _________________    sfl_poller_init       __________________
566   -----------------__________________________------------------
567 */
568 
sfl_poller_init(SFLPoller * poller,SFLAgent * agent,SFLDataSource_instance * pdsi,void * magic,getCountersFn_t getCountersFn)569 void sfl_poller_init(SFLPoller *poller,
570                      SFLAgent *agent,
571                      SFLDataSource_instance *pdsi,
572                      void *magic,         /* ptr to pass back in getCountersFn() */
573                      getCountersFn_t getCountersFn)
574 {
575     /* copy the dsi in case it points to poller->dsi, which we are about to clear */
576     SFLDataSource_instance dsi = *pdsi;
577 
578     /* preserve the *nxt pointer too, in case we are resetting this poller and it is
579        already part of the agent's linked list (thanks to Matt Woodly for pointing this out) */
580     SFLPoller *nxtPtr = poller->nxt;
581 
582     /* clear everything */
583     memset(poller, 0, sizeof(*poller));
584 
585     /* restore the linked list ptr */
586     poller->nxt = nxtPtr;
587 
588     /* now copy in the parameters */
589     poller->agent = agent;
590     poller->dsi = dsi; /* structure copy */
591     poller->magic = magic;
592     poller->getCountersFn = getCountersFn;
593 }
594 
595 /*_________________--------------------------__________________
596   _________________       reset              __________________
597   -----------------__________________________------------------
598 */
599 
resetPoller(SFLPoller * poller)600 static void resetPoller(SFLPoller *poller)
601 {
602     SFLDataSource_instance dsi = poller->dsi;
603     sfl_poller_init(poller, poller->agent, &dsi, poller->magic, poller->getCountersFn);
604 }
605 
606 /*_________________---------------------------__________________
607   _________________      MIB access           __________________
608   -----------------___________________________------------------
609 */
sfl_poller_get_sFlowCpReceiver(SFLPoller * poller)610 uint32_t sfl_poller_get_sFlowCpReceiver(SFLPoller *poller) {
611     return poller->sFlowCpReceiver;
612 }
613 
sfl_poller_set_sFlowCpReceiver(SFLPoller * poller,uint32_t sFlowCpReceiver)614 void sfl_poller_set_sFlowCpReceiver(SFLPoller *poller, uint32_t sFlowCpReceiver) {
615     poller->sFlowCpReceiver = sFlowCpReceiver;
616     if(sFlowCpReceiver == 0) resetPoller(poller);
617     else {
618         /* retrieve and cache a direct pointer to my receiver */
619         poller->myReceiver = sfl_agent_getReceiver(poller->agent, poller->sFlowCpReceiver);
620     }
621 }
622 
sfl_poller_get_sFlowCpInterval(SFLPoller * poller)623 uint32_t sfl_poller_get_sFlowCpInterval(SFLPoller *poller) {
624     return (uint32_t)poller->sFlowCpInterval;
625 }
626 
sfl_poller_set_sFlowCpInterval(SFLPoller * poller,uint32_t sFlowCpInterval)627 void sfl_poller_set_sFlowCpInterval(SFLPoller *poller, uint32_t sFlowCpInterval) {
628     poller->sFlowCpInterval = sFlowCpInterval;
629     /* Set the countersCountdown to be a randomly selected value between 1 and
630        sFlowCpInterval. That way the counter polling would be desynchronised
631        (on a 200-port switch, polling all the counters in one second could be harmful). */
632     poller->countersCountdown = sFlowCpInterval ? sfl_random(sFlowCpInterval) : 0;
633 }
634 
635 /*_________________---------------------------------__________________
636   _________________   sequence number reset         __________________
637   -----------------_________________________________------------------
638   Used to indicate a counter discontinuity
639   so that the sflow collector will know to ignore the next delta.
640 */
sfl_poller_resetCountersSeqNo(SFLPoller * poller)641 void sfl_poller_resetCountersSeqNo(SFLPoller *poller) {  poller->countersSampleSeqNo = 0; }
642 
643 /*_________________---------------------------__________________
644   _________________    sfl_poller_tick        __________________
645   -----------------___________________________------------------
646 */
647 
sfl_poller_tick(SFLPoller * poller,time_t now)648 void sfl_poller_tick(SFLPoller *poller, time_t now)
649 {
650     if(poller->countersCountdown == 0) return; /* counters retrieval was not enabled */
651     if(poller->sFlowCpReceiver == 0) return;
652 
653     if(--poller->countersCountdown == 0) {
654         if(poller->getCountersFn != NULL) {
655             /* call out for counters */
656             SFL_COUNTERS_SAMPLE_TYPE cs;
657             memset(&cs, 0, sizeof(cs));
658             poller->getCountersFn(poller->magic, poller, &cs);
659             /* this countersFn is expected to fill in some counter block elements */
660             /* and then call sfl_poller_writeCountersSample(poller, &cs); */
661         }
662         /* reset the countdown */
663         poller->countersCountdown = poller->sFlowCpInterval;
664     }
665 }
666 
667 /*_________________---------------------------------__________________
668   _________________ sfl_poller_writeCountersSample  __________________
669   -----------------_________________________________------------------
670 */
671 
sfl_poller_writeCountersSample(SFLPoller * poller,SFL_COUNTERS_SAMPLE_TYPE * cs)672 void sfl_poller_writeCountersSample(SFLPoller *poller, SFL_COUNTERS_SAMPLE_TYPE *cs)
673 {
674     /* fill in the rest of the header fields, and send to the receiver */
675     cs->sequence_number = ++poller->countersSampleSeqNo;
676 #ifdef SFL_USE_32BIT_INDEX
677     cs->ds_class = SFL_DS_CLASS(poller->dsi);
678     cs->ds_index = SFL_DS_INDEX(poller->dsi);
679 #else
680     cs->source_id = SFL_DS_DATASOURCE(poller->dsi);
681 #endif
682     /* sent to my receiver */
683     if(poller->myReceiver) sfl_receiver_writeCountersSample(poller->myReceiver, cs);
684 }
685 
686 
687 
688 
689 
690 /* ===================================================*/
691 /* ===================== RECEIVER ====================*/
692 
693 static void resetSampleCollector(SFLReceiver *receiver);
694 static void sendSample(SFLReceiver *receiver);
695 static void receiverError(SFLReceiver *receiver, char *errm);
696 static void putNet32(SFLReceiver *receiver, uint32_t val);
697 static void putAddress(SFLReceiver *receiver, SFLAddress *addr);
698 static void putOpaque(SFLReceiver *receiver, char *val, int len);
699 
700 /*_________________--------------------------__________________
701   _________________    sfl_receiver_init     __________________
702   -----------------__________________________------------------
703 */
704 
sfl_receiver_init(SFLReceiver * receiver,SFLAgent * agent)705 void sfl_receiver_init(SFLReceiver *receiver, SFLAgent *agent)
706 {
707     /* first clear everything */
708     memset(receiver, 0, sizeof(*receiver));
709 
710     /* now copy in the parameters */
711     receiver->agent = agent;
712 
713     /* set defaults */
714     receiver->sFlowRcvrMaximumDatagramSize = SFL_DEFAULT_DATAGRAM_SIZE;
715     receiver->sFlowRcvrPort = SFL_DEFAULT_COLLECTOR_PORT;
716 
717     /* prepare to receive the first sample */
718     resetSampleCollector(receiver);
719 }
720 
721 /*_________________---------------------------__________________
722   _________________      reset                __________________
723   -----------------___________________________------------------
724 
725   called on timeout, or when owner string is cleared
726 */
727 
resetReceiver(SFLReceiver * receiver)728 static void resetReceiver(SFLReceiver *receiver) {
729     /* ask agent to tell samplers and pollers to stop sending samples */
730     sfl_agent_resetReceiver(receiver->agent, receiver);
731     /* reinitialize */
732     sfl_receiver_init(receiver, receiver->agent);
733 }
734 
735 
736 /*_________________----------------------------------------_____________
737   _________________          MIB Vars                      _____________
738   -----------------________________________________________-------------
739 */
740 
sfl_receiver_get_sFlowRcvrOwner(SFLReceiver * receiver)741 char * sfl_receiver_get_sFlowRcvrOwner(SFLReceiver *receiver) {
742     return receiver->sFlowRcvrOwner;
743 }
sfl_receiver_set_sFlowRcvrOwner(SFLReceiver * receiver,char * sFlowRcvrOwner)744 void sfl_receiver_set_sFlowRcvrOwner(SFLReceiver *receiver, char *sFlowRcvrOwner) {
745     receiver->sFlowRcvrOwner = sFlowRcvrOwner;
746     if(sFlowRcvrOwner == NULL || sFlowRcvrOwner[0] == '\0') {
747         /* reset condition! owner string was cleared */
748         resetReceiver(receiver);
749     }
750 }
sfl_receiver_get_sFlowRcvrTimeout(SFLReceiver * receiver)751 time_t sfl_receiver_get_sFlowRcvrTimeout(SFLReceiver *receiver) {
752     return receiver->sFlowRcvrTimeout;
753 }
sfl_receiver_set_sFlowRcvrTimeout(SFLReceiver * receiver,time_t sFlowRcvrTimeout)754 void sfl_receiver_set_sFlowRcvrTimeout(SFLReceiver *receiver, time_t sFlowRcvrTimeout) {
755     receiver->sFlowRcvrTimeout =sFlowRcvrTimeout;
756 }
sfl_receiver_get_sFlowRcvrMaximumDatagramSize(SFLReceiver * receiver)757 uint32_t sfl_receiver_get_sFlowRcvrMaximumDatagramSize(SFLReceiver *receiver) {
758     return receiver->sFlowRcvrMaximumDatagramSize;
759 }
sfl_receiver_set_sFlowRcvrMaximumDatagramSize(SFLReceiver * receiver,uint32_t sFlowRcvrMaximumDatagramSize)760 void sfl_receiver_set_sFlowRcvrMaximumDatagramSize(SFLReceiver *receiver, uint32_t sFlowRcvrMaximumDatagramSize) {
761     uint32_t mdz = sFlowRcvrMaximumDatagramSize;
762     if(mdz < SFL_MIN_DATAGRAM_SIZE) mdz = SFL_MIN_DATAGRAM_SIZE;
763     receiver->sFlowRcvrMaximumDatagramSize = mdz;
764 }
sfl_receiver_get_sFlowRcvrAddress(SFLReceiver * receiver)765 SFLAddress *sfl_receiver_get_sFlowRcvrAddress(SFLReceiver *receiver) {
766     return &receiver->sFlowRcvrAddress;
767 }
sfl_receiver_set_sFlowRcvrAddress(SFLReceiver * receiver,SFLAddress * sFlowRcvrAddress)768 void sfl_receiver_set_sFlowRcvrAddress(SFLReceiver *receiver, SFLAddress *sFlowRcvrAddress) {
769     if(sFlowRcvrAddress) receiver->sFlowRcvrAddress = *sFlowRcvrAddress; /* structure copy */
770 }
sfl_receiver_get_sFlowRcvrPort(SFLReceiver * receiver)771 uint32_t sfl_receiver_get_sFlowRcvrPort(SFLReceiver *receiver) {
772     return receiver->sFlowRcvrPort;
773 }
sfl_receiver_set_sFlowRcvrPort(SFLReceiver * receiver,uint32_t sFlowRcvrPort)774 void sfl_receiver_set_sFlowRcvrPort(SFLReceiver *receiver, uint32_t sFlowRcvrPort) {
775     receiver->sFlowRcvrPort = sFlowRcvrPort;
776 }
777 
778 /*_________________---------------------------__________________
779   _________________   sfl_receiver_tick       __________________
780   -----------------___________________________------------------
781 */
782 
sfl_receiver_tick(SFLReceiver * receiver,time_t now)783 void sfl_receiver_tick(SFLReceiver *receiver, time_t now)
784 {
785     /* if there are any samples to send, flush them now */
786     if(receiver->sampleCollector.numSamples > 0) sendSample(receiver);
787     /* check the timeout */
788     if(receiver->sFlowRcvrTimeout && (uint32_t)receiver->sFlowRcvrTimeout != 0xFFFFFFFF) {
789         /* count down one tick and reset if we reach 0 */
790         if(--receiver->sFlowRcvrTimeout == 0) resetReceiver(receiver);
791     }
792 }
793 
794 /*_________________-----------------------------__________________
795   _________________   receiver write utilities  __________________
796   -----------------_____________________________------------------
797 */
798 
put32(SFLReceiver * receiver,uint32_t val)799 static void put32(SFLReceiver *receiver, uint32_t val)
800 {
801     *receiver->sampleCollector.datap++ = val;
802 }
803 
putNet32(SFLReceiver * receiver,uint32_t val)804 static void putNet32(SFLReceiver *receiver, uint32_t val)
805 {
806     *receiver->sampleCollector.datap++ = htonl(val);
807 }
808 
putNet64(SFLReceiver * receiver,uint64_t val64)809 static void putNet64(SFLReceiver *receiver, uint64_t val64)
810 {
811     uint32_t *firstQuadPtr = receiver->sampleCollector.datap;
812     /* first copy the bytes in */
813     memcpy((byte_t *)firstQuadPtr, &val64, 8);
814     if(htonl(1) != 1) {
815         /* swap the bytes, and reverse the quads too */
816         uint32_t tmp = *receiver->sampleCollector.datap++;
817         *firstQuadPtr = htonl(*receiver->sampleCollector.datap);
818         *receiver->sampleCollector.datap++ = htonl(tmp);
819     }
820     else receiver->sampleCollector.datap += 2;
821 }
822 
put128(SFLReceiver * receiver,byte_t * val)823 static void put128(SFLReceiver *receiver, byte_t *val)
824 {
825     memcpy(receiver->sampleCollector.datap, val, 16);
826     receiver->sampleCollector.datap += 4;
827 }
828 
putString(SFLReceiver * receiver,SFLString * s)829 static void putString(SFLReceiver *receiver, SFLString *s)
830 {
831     putNet32(receiver, s->len);
832     memcpy(receiver->sampleCollector.datap, s->str, s->len);
833     receiver->sampleCollector.datap += (s->len + 3) / 4; /* pad to 4-byte boundary */
834 }
835 
stringEncodingLength(SFLString * s)836 static uint32_t stringEncodingLength(SFLString *s) {
837     /* answer in bytes,  so remember to mulitply by 4 after rounding up to nearest 4-byte boundary */
838     return 4 + (((s->len + 3) / 4) * 4);
839 }
840 
putAddress(SFLReceiver * receiver,SFLAddress * addr)841 static void putAddress(SFLReceiver *receiver, SFLAddress *addr)
842 {
843     /* encode unspecified addresses as IPV4:0.0.0.0 - or should we flag this as an error? */
844     if(addr->type == 0) {
845         putNet32(receiver, SFLADDRESSTYPE_IP_V4);
846         put32(receiver, 0);
847     }
848     else {
849         putNet32(receiver, addr->type);
850         if(addr->type == SFLADDRESSTYPE_IP_V4) put32(receiver, addr->address.ip_v4.addr);
851         else put128(receiver, addr->address.ip_v6.addr);
852     }
853 }
854 
putOpaque(SFLReceiver * receiver,char * val,int len)855 static void putOpaque(SFLReceiver *receiver, char *val, int len)
856 {
857     memcpy((char *)receiver->sampleCollector.datap, val, len);
858     receiver->sampleCollector.datap += ((len+3)/4);
859 }
860 
httpOpEncodingLength(SFLSampled_http * op)861 static uint32_t httpOpEncodingLength(SFLSampled_http *op) {
862   uint32_t elemSiz = stringEncodingLength(&op->uri);
863   elemSiz += stringEncodingLength(&op->host);
864   elemSiz += stringEncodingLength(&op->referrer);
865   elemSiz += stringEncodingLength(&op->useragent);
866   elemSiz += stringEncodingLength(&op->xff);
867   elemSiz += stringEncodingLength(&op->authuser);
868   elemSiz += stringEncodingLength(&op->mimetype);
869   elemSiz += 32; /* method, protocol, req_bytes, resp_bytes, uS, status */
870   return elemSiz;
871 }
872 
putSocket4(SFLReceiver * receiver,SFLExtended_socket_ipv4 * socket4)873 static void putSocket4(SFLReceiver *receiver, SFLExtended_socket_ipv4 *socket4) {
874     putNet32(receiver, socket4->protocol);
875     put32(receiver, socket4->local_ip.addr);
876     put32(receiver, socket4->remote_ip.addr);
877     putNet32(receiver, socket4->local_port);
878     putNet32(receiver, socket4->remote_port);
879 }
880 
putSocket6(SFLReceiver * receiver,SFLExtended_socket_ipv6 * socket6)881 static void putSocket6(SFLReceiver *receiver, SFLExtended_socket_ipv6 *socket6) {
882     putNet32(receiver, socket6->protocol);
883     put128(receiver, socket6->local_ip.addr);
884     put128(receiver, socket6->remote_ip.addr);
885     putNet32(receiver, socket6->local_port);
886     putNet32(receiver, socket6->remote_port);
887 }
888 
889 
890 /*_________________-----------------------------__________________
891   _________________      computeFlowSampleSize  __________________
892   -----------------_____________________________------------------
893 */
894 
computeFlowSampleSize(SFLReceiver * receiver,SFL_FLOW_SAMPLE_TYPE * fs)895 static int computeFlowSampleSize(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs)
896 {
897     SFLFlow_sample_element *elem;
898     uint32_t elemSiz;
899 #ifdef SFL_USE_32BIT_INDEX
900     uint siz = 52; /* tag, length, sequence_number, ds_class, ds_index, sampling_rate,
901                       sample_pool, drops, inputFormat, input, outputFormat, output, number of elements */
902 #else
903     uint32_t siz = 40; /* tag, length, sequence_number, source_id, sampling_rate,
904                           sample_pool, drops, input, output, number of elements */
905 #endif
906 
907     /* hard code the wire-encoding sizes, in case the structures are expanded to be 64-bit aligned */
908 
909     fs->num_elements = 0; /* we're going to count them again even if this was set by the client */
910     for(elem = fs->elements; elem != NULL; elem = elem->nxt) {
911         fs->num_elements++;
912         siz += 8; /* tag, length */
913         elemSiz = 0;
914         switch(elem->tag) {
915         case SFLFLOW_HTTP: elemSiz = httpOpEncodingLength(&elem->flowType.http);  break;
916         case SFLFLOW_EX_SOCKET4: elemSiz = XDRSIZ_SFLEXTENDED_SOCKET4;  break;
917         case SFLFLOW_EX_SOCKET6: elemSiz = XDRSIZ_SFLEXTENDED_SOCKET6;  break;
918         default:
919             {
920                 char errm[MAX_ERRMSG_LEN];
921                 snprintf(errm, MAX_ERRMSG_LEN, "computeFlowSampleSize(): unexpected tag (%ud)", elem->tag);
922                 receiverError(receiver, errm);
923                 return -1;
924             }
925             break;
926         }
927         /* cache the element size, and accumulate it into the overall FlowSample size */
928         elem->length = elemSiz;
929         siz += elemSiz;
930     }
931 
932     return siz;
933 }
934 
935 /*_________________-------------------------------__________________
936   _________________ sfl_receiver_writeFlowSample  __________________
937   -----------------_______________________________------------------
938 */
939 
sfl_receiver_writeFlowSample(SFLReceiver * receiver,SFL_FLOW_SAMPLE_TYPE * fs)940 int sfl_receiver_writeFlowSample(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs)
941 {
942     int packedSize;
943     SFLFlow_sample_element *elem;
944     uint32_t encodingSize;
945 
946     if(fs == NULL) return -1;
947     if((packedSize = computeFlowSampleSize(receiver, fs)) == -1) return -1;
948 
949     /* check in case this one sample alone is too big for the datagram */
950     if(packedSize > (int)(receiver->sFlowRcvrMaximumDatagramSize)) {
951         receiverError(receiver, "flow sample too big for datagram");
952         return -1;
953     }
954 
955     /* if the sample pkt is full enough so that this sample might put */
956     /* it over the limit, then we should send it now before going on. */
957     if((receiver->sampleCollector.pktlen + packedSize) >= receiver->sFlowRcvrMaximumDatagramSize)
958         sendSample(receiver);
959 
960     receiver->sampleCollector.numSamples++;
961 
962 #ifdef SFL_USE_32BIT_INDEX
963     putNet32(receiver, SFLFLOW_SAMPLE_EXPANDED);
964 #else
965     putNet32(receiver, SFLFLOW_SAMPLE);
966 #endif
967 
968     putNet32(receiver, packedSize - 8); /* don't include tag and len */
969     putNet32(receiver, fs->sequence_number);
970 
971 #ifdef SFL_USE_32BIT_INDEX
972     putNet32(receiver, fs->ds_class);
973     putNet32(receiver, fs->ds_index);
974 #else
975     putNet32(receiver, fs->source_id);
976 #endif
977 
978     putNet32(receiver, fs->sampling_rate);
979     putNet32(receiver, fs->sample_pool);
980     putNet32(receiver, fs->drops);
981 
982 #ifdef SFL_USE_32BIT_INDEX
983     putNet32(receiver, fs->inputFormat);
984     putNet32(receiver, fs->input);
985     putNet32(receiver, fs->outputFormat);
986     putNet32(receiver, fs->output);
987 #else
988     putNet32(receiver, fs->input);
989     putNet32(receiver, fs->output);
990 #endif
991 
992     putNet32(receiver, fs->num_elements);
993 
994     for(elem = fs->elements; elem != NULL; elem = elem->nxt) {
995 
996         putNet32(receiver, elem->tag);
997         putNet32(receiver, elem->length); /* length cached in computeFlowSampleSize() */
998 
999         switch(elem->tag) {
1000         case SFLFLOW_EX_SOCKET4: putSocket4(receiver, &elem->flowType.socket4); break;
1001         case SFLFLOW_EX_SOCKET6: putSocket6(receiver, &elem->flowType.socket6); break;
1002         case SFLFLOW_HTTP:
1003             putNet32(receiver, elem->flowType.http.method);
1004             putNet32(receiver, elem->flowType.http.protocol);
1005             putString(receiver, &elem->flowType.http.uri);
1006             putString(receiver, &elem->flowType.http.host);
1007             putString(receiver, &elem->flowType.http.referrer);
1008             putString(receiver, &elem->flowType.http.useragent);
1009             putString(receiver, &elem->flowType.http.xff);
1010             putString(receiver, &elem->flowType.http.authuser);
1011             putString(receiver, &elem->flowType.http.mimetype);
1012             putNet64(receiver, elem->flowType.http.req_bytes);
1013             putNet64(receiver, elem->flowType.http.resp_bytes);
1014             putNet32(receiver, elem->flowType.http.uS);
1015             putNet32(receiver, elem->flowType.http.status);
1016             break;
1017         default:
1018             {
1019                 char errm[MAX_ERRMSG_LEN];
1020                 snprintf(errm, MAX_ERRMSG_LEN, "sfl_receiver_writeFlowSample: unexpected tag (%ud)", elem->tag);
1021                 receiverError(receiver, errm);
1022                 return -1;
1023             }
1024             break;
1025         }
1026     }
1027 
1028     /* sanity check */
1029     encodingSize = (byte_t *)receiver->sampleCollector.datap
1030         - (byte_t *)receiver->sampleCollector.data
1031         - receiver->sampleCollector.pktlen;
1032 
1033     if(encodingSize != (uint32_t)packedSize) {
1034         char errm[MAX_ERRMSG_LEN];
1035         snprintf(errm, MAX_ERRMSG_LEN, "sfl_receiver_writeFlowSample: encoding_size(%ud) != expected_size(%ud)",
1036                      encodingSize,
1037                      packedSize);
1038         receiverError(receiver, errm);
1039         return -1;
1040     }
1041 
1042     /* update the pktlen */
1043     receiver->sampleCollector.pktlen = (byte_t *)receiver->sampleCollector.datap - (byte_t *)receiver->sampleCollector.data;
1044     return packedSize;
1045 }
1046 
1047 /*_________________--------------------------------------__________________
1048   _________________ sfl_receiver_writeEncodedFlowSample  __________________
1049   -----------------______________________________________------------------
1050 */
1051 
sfl_receiver_writeEncodedFlowSample(SFLReceiver * receiver,SFL_FLOW_SAMPLE_TYPE * fs,char * xdrBytes,uint32_t packedSize)1052 int sfl_receiver_writeEncodedFlowSample(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs, char *xdrBytes, uint32_t packedSize)
1053 {
1054     uint32_t encodingSize;
1055     uint32_t overrideEncodingSize;
1056     uint32_t xdrHdrStrip;
1057 
1058     /* check in case this one sample alone is too big for the datagram */
1059     if(packedSize > receiver->sFlowRcvrMaximumDatagramSize) {
1060         receiverError(receiver, "flow sample too big for datagram");
1061         return -1;
1062     }
1063 
1064     /* if the sample pkt is full enough so that this sample might put */
1065     /* it over the limit, then we should send it now before going on. */
1066     if((receiver->sampleCollector.pktlen + packedSize) >= receiver->sFlowRcvrMaximumDatagramSize)
1067         sendSample(receiver);
1068 
1069     receiver->sampleCollector.numSamples++;
1070 
1071 #ifdef SFL_USE_32BIT_INDEX
1072     putNet32(receiver, SFLFLOW_SAMPLE_EXPANDED);
1073 #else
1074     putNet32(receiver, SFLFLOW_SAMPLE);
1075 #endif
1076 
1077     putNet32(receiver, packedSize - 8); /* don't include tag and len bytes in the length */
1078     putNet32(receiver, fs->sequence_number);
1079 
1080 #ifdef SFL_USE_32BIT_INDEX
1081     putNet32(receiver, fs->ds_class);
1082     putNet32(receiver, fs->ds_index);
1083 #else
1084     putNet32(receiver, fs->source_id);
1085 #endif
1086 
1087     putNet32(receiver, fs->sampling_rate);
1088     putNet32(receiver, fs->sample_pool);
1089     putNet32(receiver, fs->drops);
1090 
1091     /* sanity check */
1092     overrideEncodingSize = (byte_t *)receiver->sampleCollector.datap
1093         - (byte_t *)receiver->sampleCollector.data
1094         - receiver->sampleCollector.pktlen;
1095 
1096 #ifdef SFL_USE_32BIT_INDEX
1097     xdrHdrStrip = 32; /* tag, length, sequence_number, ds_class, ds_index, sampling_rate,
1098                          sample_pool, drops, [inputFormat, input, outputFormat, output, number of elements...] */
1099 #else
1100     xdrHdrStrip = 28; /* tag, length, sequence_number, source_id, sampling_rate,
1101                          sample_pool, drops, [input, output, number of elements...] */
1102 #endif
1103 
1104     memcpy(receiver->sampleCollector.datap, xdrBytes + xdrHdrStrip, packedSize - xdrHdrStrip);
1105     receiver->sampleCollector.datap += ((packedSize - xdrHdrStrip) >> 2);
1106 
1107     /* sanity check */
1108     encodingSize = (byte_t *)receiver->sampleCollector.datap
1109         - (byte_t *)receiver->sampleCollector.data
1110         - receiver->sampleCollector.pktlen;
1111 
1112     if(encodingSize != (uint32_t)packedSize) {
1113         char errm[MAX_ERRMSG_LEN];
1114         snprintf(errm, MAX_ERRMSG_LEN, "sfl_receiver_writeEncodedFlowSample: encoding_size(%ud) != expected_size(%ud) [overrideEncodingSize=%ud xdrHeaderStrip=%ud pktlen=%ud]",
1115                      encodingSize,
1116                      packedSize,
1117                      overrideEncodingSize,
1118                      xdrHdrStrip,
1119                      receiver->sampleCollector.pktlen);
1120         receiverError(receiver, errm);
1121         return -1;
1122     }
1123 
1124     /* update the pktlen */
1125     receiver->sampleCollector.pktlen = (byte_t *)receiver->sampleCollector.datap - (byte_t *)receiver->sampleCollector.data;
1126     return packedSize;
1127 }
1128 
1129 /*_________________-----------------------------__________________
1130   _________________ computeCountersSampleSize   __________________
1131   -----------------_____________________________------------------
1132 */
1133 
computeCountersSampleSize(SFLReceiver * receiver,SFL_COUNTERS_SAMPLE_TYPE * cs)1134 static int computeCountersSampleSize(SFLReceiver *receiver, SFL_COUNTERS_SAMPLE_TYPE *cs)
1135 {
1136     SFLCounters_sample_element *elem;
1137     uint32_t elemSiz;
1138 
1139 #ifdef SFL_USE_32BIT_INDEX
1140     uint siz = 24; /* tag, length, sequence_number, ds_class, ds_index, number of elements */
1141 #else
1142     uint32_t siz = 20; /* tag, length, sequence_number, source_id, number of elements */
1143 #endif
1144 
1145     cs->num_elements = 0; /* we're going to count them again even if this was set by the client */
1146     for( elem = cs->elements; elem != NULL; elem = elem->nxt) {
1147         cs->num_elements++;
1148         siz += 8; /* tag, length */
1149         elemSiz = 0;
1150 
1151         /* hard code the wire-encoding sizes rather than use sizeof() -- in case the
1152            structures are expanded to be 64-bit aligned */
1153 
1154         switch(elem->tag) {
1155         case SFLCOUNTERS_HOST_PAR: elemSiz = 8 /*sizeof(elem->counterBlock.host_par)*/;  break;
1156         case SFLCOUNTERS_HTTP: elemSiz = XDRSIZ_SFLHTTP_COUNTERS /*sizeof(elem->counterBlock.http)*/;  break;
1157         default:
1158             {
1159                 char errm[MAX_ERRMSG_LEN];
1160                 snprintf(errm, MAX_ERRMSG_LEN, "computeCounterSampleSize(): unexpected counters tag (%ud)", elem->tag);
1161                 receiverError(receiver, errm);
1162                 return -1;
1163             }
1164             break;
1165         }
1166         /* cache the element size, and accumulate it into the overall FlowSample size */
1167         elem->length = elemSiz;
1168         siz += elemSiz;
1169     }
1170     return siz;
1171 }
1172 
1173 /*_________________----------------------------------__________________
1174   _________________ sfl_receiver_writeCountersSample __________________
1175   -----------------__________________________________------------------
1176 */
1177 
sfl_receiver_writeCountersSample(SFLReceiver * receiver,SFL_COUNTERS_SAMPLE_TYPE * cs)1178 int sfl_receiver_writeCountersSample(SFLReceiver *receiver, SFL_COUNTERS_SAMPLE_TYPE *cs)
1179 {
1180     int packedSize;
1181     SFLCounters_sample_element *elem;
1182     uint32_t encodingSize;
1183 
1184     if(cs == NULL) return -1;
1185     /* if the sample pkt is full enough so that this sample might put */
1186     /* it over the limit, then we should send it now. */
1187     if((packedSize = computeCountersSampleSize(receiver, cs)) == -1) return -1;
1188 
1189     /* check in case this one sample alone is too big for the datagram */
1190     /* in fact - if it is even half as big then we should ditch it. Very */
1191     /* important to avoid overruning the packet buffer. */
1192     if(packedSize > (int)(receiver->sFlowRcvrMaximumDatagramSize / 2)) {
1193         receiverError(receiver, "counters sample too big for datagram");
1194         return -1;
1195     }
1196 
1197     if((receiver->sampleCollector.pktlen + packedSize) >= receiver->sFlowRcvrMaximumDatagramSize)
1198         sendSample(receiver);
1199 
1200     receiver->sampleCollector.numSamples++;
1201 
1202 #ifdef SFL_USE_32BIT_INDEX
1203     putNet32(receiver, SFLCOUNTERS_SAMPLE_EXPANDED);
1204 #else
1205     putNet32(receiver, SFLCOUNTERS_SAMPLE);
1206 #endif
1207 
1208     putNet32(receiver, packedSize - 8); /* tag and length not included */
1209     putNet32(receiver, cs->sequence_number);
1210 
1211 #ifdef SFL_USE_32BIT_INDEX
1212     putNet32(receiver, cs->ds_class);
1213     putNet32(receiver, cs->ds_index);
1214 #else
1215     putNet32(receiver, cs->source_id);
1216 #endif
1217 
1218     putNet32(receiver, cs->num_elements);
1219 
1220     for(elem = cs->elements; elem != NULL; elem = elem->nxt) {
1221 
1222         putNet32(receiver, elem->tag);
1223         putNet32(receiver, elem->length); /* length cached in computeCountersSampleSize() */
1224 
1225         switch(elem->tag) {
1226         case SFLCOUNTERS_HOST_PAR:
1227             putNet32(receiver, elem->counterBlock.host_par.dsClass);
1228             putNet32(receiver, elem->counterBlock.host_par.dsIndex);
1229             break;
1230         case SFLCOUNTERS_HTTP:
1231             putNet32(receiver, elem->counterBlock.http.method_option_count);
1232             putNet32(receiver, elem->counterBlock.http.method_get_count);
1233             putNet32(receiver, elem->counterBlock.http.method_head_count);
1234             putNet32(receiver, elem->counterBlock.http.method_post_count);
1235             putNet32(receiver, elem->counterBlock.http.method_put_count);
1236             putNet32(receiver, elem->counterBlock.http.method_delete_count);
1237             putNet32(receiver, elem->counterBlock.http.method_trace_count);
1238             putNet32(receiver, elem->counterBlock.http.method_connect_count);
1239             putNet32(receiver, elem->counterBlock.http.method_other_count);
1240             putNet32(receiver, elem->counterBlock.http.status_1XX_count);
1241             putNet32(receiver, elem->counterBlock.http.status_2XX_count);
1242             putNet32(receiver, elem->counterBlock.http.status_3XX_count);
1243             putNet32(receiver, elem->counterBlock.http.status_4XX_count);
1244             putNet32(receiver, elem->counterBlock.http.status_5XX_count);
1245             putNet32(receiver, elem->counterBlock.http.status_other_count);
1246             break;
1247         default:
1248             {
1249                 char errm[MAX_ERRMSG_LEN];
1250                 snprintf(errm, MAX_ERRMSG_LEN, "unexpected counters tag (%ud)", elem->tag);
1251                 receiverError(receiver, errm);
1252                 return -1;
1253             }
1254             break;
1255         }
1256     }
1257     /* sanity check */
1258     encodingSize = (byte_t *)receiver->sampleCollector.datap
1259         - (byte_t *)receiver->sampleCollector.data
1260         - receiver->sampleCollector.pktlen;
1261     if(encodingSize != (uint32_t)packedSize) {
1262         char errm[MAX_ERRMSG_LEN];
1263         snprintf(errm, MAX_ERRMSG_LEN, "sfl_receiver_writeCountersSample: encoding_size(%ud) != expected_size(%ud)",
1264                      encodingSize,
1265                      packedSize);
1266         receiverError(receiver, errm);
1267         return -1;
1268     }
1269 
1270     /* update the pktlen */
1271     receiver->sampleCollector.pktlen = (byte_t *)receiver->sampleCollector.datap - (byte_t *)receiver->sampleCollector.data;
1272     return packedSize;
1273 }
1274 
1275 /*_________________---------------------------------__________________
1276   _________________ sfl_receiver_samplePacketsSent  __________________
1277   -----------------_________________________________------------------
1278 */
1279 
sfl_receiver_samplePacketsSent(SFLReceiver * receiver)1280 uint32_t sfl_receiver_samplePacketsSent(SFLReceiver *receiver)
1281 {
1282     return receiver->sampleCollector.packetSeqNo;
1283 }
1284 
1285 /*_________________---------------------------__________________
1286   _________________     sendSample            __________________
1287   -----------------___________________________------------------
1288 */
1289 
sendSample(SFLReceiver * receiver)1290 static void sendSample(SFLReceiver *receiver)
1291 {
1292     /* construct and send out the sample, then reset for the next one... */
1293     SFLAgent *agent = receiver->agent;
1294 
1295     /* go back and fill in the header */
1296     receiver->sampleCollector.datap = receiver->sampleCollector.data;
1297     putNet32(receiver, SFLDATAGRAM_VERSION5);
1298     putAddress(receiver, &agent->myIP);
1299     putNet32(receiver, agent->subId);
1300     putNet32(receiver, ++receiver->sampleCollector.packetSeqNo);
1301     putNet32(receiver,  (uint32_t)((agent->now - agent->bootTime) * 1000));
1302     putNet32(receiver, receiver->sampleCollector.numSamples);
1303 
1304     /* send */
1305     if(agent->sendFn) (*agent->sendFn)(agent->magic,
1306                                        agent,
1307                                        receiver,
1308                                        (byte_t *)receiver->sampleCollector.data,
1309                                        receiver->sampleCollector.pktlen);
1310 
1311     /* reset for the next time */
1312     resetSampleCollector(receiver);
1313 }
1314 
1315 /*_________________---------------------------__________________
1316   _________________   resetSampleCollector    __________________
1317   -----------------___________________________------------------
1318 */
1319 
resetSampleCollector(SFLReceiver * receiver)1320 static void resetSampleCollector(SFLReceiver *receiver)
1321 {
1322     receiver->sampleCollector.pktlen = 0;
1323     receiver->sampleCollector.numSamples = 0;
1324 
1325     /* clear the buffer completely (ensures that pad bytes will always be zeros - thank you CW) */
1326     memset((byte_t *)receiver->sampleCollector.data, 0, (SFL_SAMPLECOLLECTOR_DATA_QUADS * 4));
1327 
1328     /* point the datap to just after the header */
1329     receiver->sampleCollector.datap = (receiver->agent->myIP.type == SFLADDRESSTYPE_IP_V6) ?
1330         (receiver->sampleCollector.data + 10) :
1331         (receiver->sampleCollector.data + 7);
1332 
1333     /* start pktlen with the right value */
1334     receiver->sampleCollector.pktlen = (byte_t *)receiver->sampleCollector.datap - (byte_t *)receiver->sampleCollector.data;
1335 }
1336 
1337 /*_________________---------------------------__________________
1338   _________________    receiverError          __________________
1339   -----------------___________________________------------------
1340 */
1341 
receiverError(SFLReceiver * receiver,char * msg)1342 static void receiverError(SFLReceiver *receiver, char *msg)
1343 {
1344     sfl_agent_error(receiver->agent, "receiver", msg);
1345     resetSampleCollector(receiver);
1346 }
1347 
1348 
1349 /*_________________---------------------------__________________
1350   _________________         exposure          __________________
1351   -----------------___________________________------------------
1352 selective exposure of some internal hooks,  just for this project
1353 */
1354 
sfl_receiver_put32(SFLReceiver * receiver,uint32_t val)1355 void sfl_receiver_put32(SFLReceiver *receiver, uint32_t val) { put32(receiver, val); }
sfl_receiver_putOpaque(SFLReceiver * receiver,char * val,int len)1356 void sfl_receiver_putOpaque(SFLReceiver *receiver, char *val, int len) { putOpaque(receiver, val, len); }
sfl_receiver_resetSampleCollector(SFLReceiver * receiver)1357 void sfl_receiver_resetSampleCollector(SFLReceiver *receiver) { resetSampleCollector(receiver); }
1358