1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /* Copyright (c) 2002-2014 InMon Corp. Licensed under the terms of the InMon sFlow licence: */
3 /* http://www.inmon.com/technology/sflowlicense.txt */
4
5 #include "ngx_http_sflow_api.h"
6
7 /* ===================================================*/
8 /* ===================== AGENT =======================*/
9
10
11 static void sfl_agent_jumpTableAdd(SFLAgent *agent, SFLSampler *sampler);
12 static void sfl_agent_jumpTableRemove(SFLAgent *agent, SFLSampler *sampler);
13
14 /*_________________---------------------------__________________
15 _________________ error logging __________________
16 -----------------___________________________------------------
17 */
18 #define MAX_ERRMSG_LEN 1000
19
sfl_agent_error(SFLAgent * agent,char * modName,char * msg)20 void sfl_agent_error(SFLAgent *agent, char *modName, char *msg)
21 {
22 char errm[MAX_ERRMSG_LEN];
23 snprintf(errm, MAX_ERRMSG_LEN, "sfl_agent_error: %s: %s\n", modName, msg);
24 if(agent->errorFn) (*agent->errorFn)(agent->magic, agent, errm);
25 }
26
27 /*________________--------------------------__________________
28 ________________ sfl_agent_init __________________
29 ----------------__________________________------------------
30 */
31
sfl_agent_init(SFLAgent * agent,SFLAddress * myIP,uint32_t subId,time_t bootTime,time_t now,void * magic,errorFn_t errorFn,sendFn_t sendFn)32 void sfl_agent_init(SFLAgent *agent,
33 SFLAddress *myIP, /* IP address of this agent in net byte order */
34 uint32_t subId, /* agent_sub_id */
35 time_t bootTime, /* agent boot time */
36 time_t now, /* time now */
37 void *magic, /* ptr to pass back in logging and alloc fns */
38 errorFn_t errorFn,
39 sendFn_t sendFn)
40 {
41 /* first clear everything */
42 memset(agent, 0, sizeof(*agent));
43 /* now copy in the parameters */
44 agent->myIP = *myIP; /* structure copy */
45 agent->subId = subId;
46 agent->bootTime = bootTime;
47 agent->now = now;
48 agent->magic = magic;
49 agent->errorFn = errorFn;
50 agent->sendFn = sendFn;
51 }
52
53 /*_________________---------------------------__________________
54 _________________ sfl_agent_tick __________________
55 -----------------___________________________------------------
56 */
57
sfl_agent_tick(SFLAgent * agent,time_t now)58 void sfl_agent_tick(SFLAgent *agent, time_t now)
59 {
60 SFLReceiver *rcv;
61 SFLSampler *sm;
62 SFLPoller *pl;
63
64 agent->now = now;
65 /* receivers use ticks to flush send data */
66 for( rcv = agent->receivers; rcv != NULL; rcv = rcv->nxt) sfl_receiver_tick(rcv, now);
67 /* samplers use ticks to decide when they are sampling too fast */
68 for( sm = agent->samplers; sm != NULL; sm = sm->nxt) sfl_sampler_tick(sm, now);
69 /* pollers use ticks to decide when to ask for counters */
70 for( pl = agent->pollers; pl != NULL; pl = pl->nxt) sfl_poller_tick(pl, now);
71 }
72
73 /*_________________---------------------------__________________
74 _________________ sfl_agent_addReceiver __________________
75 -----------------___________________________------------------
76 */
77
sfl_agent_addReceiver(SFLAgent * agent,SFLReceiver * rcv)78 SFLReceiver *sfl_agent_addReceiver(SFLAgent *agent, SFLReceiver *rcv)
79 {
80 SFLReceiver *r, *prev;
81
82 prev = NULL;
83 sfl_receiver_init(rcv, agent);
84 /* add to end of list - to preserve the receiver index numbers for existing receivers */
85
86 for(r = agent->receivers; r != NULL; prev = r, r = r->nxt);
87 if(prev) prev->nxt = rcv;
88 else agent->receivers = rcv;
89 rcv->nxt = NULL;
90 return rcv;
91 }
92
93 /*_________________---------------------------__________________
94 _________________ sfl_dsi_compare __________________
95 -----------------___________________________------------------
96
97 Note that if there is a mixture of ds_classes for this agent, then
98 the simple numeric comparison may not be correct - the sort order (for
99 the purposes of the SNMP MIB) should really be determined by the OID
100 that these numeric ds_class numbers are a shorthand for. For example,
101 ds_class == 0 means ifIndex, which is the oid "1.3.6.1.2.1.2.2.1"
102 */
103
sfl_dsi_compare(SFLDataSource_instance * pdsi1,SFLDataSource_instance * pdsi2)104 static int sfl_dsi_compare(SFLDataSource_instance *pdsi1, SFLDataSource_instance *pdsi2) {
105 /* could have used just memcmp(), but not sure if that would */
106 /* give the right answer on little-endian platforms. Safer to be explicit... */
107 int cmp = pdsi2->ds_class - pdsi1->ds_class;
108 if(cmp == 0) cmp = pdsi2->ds_index - pdsi1->ds_index;
109 if(cmp == 0) cmp = pdsi2->ds_instance - pdsi1->ds_instance;
110 return cmp;
111 }
112
113 /*_________________---------------------------__________________
114 _________________ sfl_agent_addSampler __________________
115 -----------------___________________________------------------
116 */
117
sfl_agent_addSampler(SFLAgent * agent,SFLDataSource_instance * pdsi,SFLSampler * newsm)118 SFLSampler *sfl_agent_addSampler(SFLAgent *agent,
119 SFLDataSource_instance *pdsi,
120 SFLSampler *newsm)
121 {
122 SFLSampler *prev, *sm, *test;
123
124 prev = NULL;
125 sm = agent->samplers;
126 /* keep the list sorted */
127 for(; sm != NULL; prev = sm, sm = sm->nxt) {
128 int64_t cmp = sfl_dsi_compare(pdsi, &sm->dsi);
129 if(cmp == 0) return sm; /* found - return existing one */
130 if(cmp < 0) break; /* insert here */
131 }
132 /* either we found the insert point, or reached the end of the list... */
133 sfl_sampler_init(newsm, agent, pdsi);
134 if(prev) prev->nxt = newsm;
135 else agent->samplers = newsm;
136 newsm->nxt = sm;
137
138 /* see if we should go in the ifIndex jumpTable */
139 if(SFL_DS_CLASS(newsm->dsi) == 0) {
140 test = sfl_agent_getSamplerByIfIndex(agent, SFL_DS_INDEX(newsm->dsi));
141 if(test && (SFL_DS_INSTANCE(newsm->dsi) < SFL_DS_INSTANCE(test->dsi))) {
142 /* replace with this new one because it has a lower ds_instance number */
143 sfl_agent_jumpTableRemove(agent, test);
144 test = NULL;
145 }
146 if(test == NULL) sfl_agent_jumpTableAdd(agent, newsm);
147 }
148 return newsm;
149 }
150
151 /*_________________---------------------------__________________
152 _________________ sfl_agent_addPoller __________________
153 -----------------___________________________------------------
154 */
155
sfl_agent_addPoller(SFLAgent * agent,SFLDataSource_instance * pdsi,void * magic,getCountersFn_t getCountersFn,SFLPoller * newpl)156 SFLPoller *sfl_agent_addPoller(SFLAgent *agent,
157 SFLDataSource_instance *pdsi,
158 void *magic, /* ptr to pass back in getCountersFn() */
159 getCountersFn_t getCountersFn,
160 SFLPoller *newpl)
161 {
162 /* keep the list sorted */
163 SFLPoller *prev = NULL, *pl = agent->pollers;
164 for(; pl != NULL; prev = pl, pl = pl->nxt) {
165 int64_t cmp = sfl_dsi_compare(pdsi, &pl->dsi);
166 if(cmp == 0) return pl; /* found - return existing one */
167 if(cmp < 0) break; /* insert here */
168 }
169 /* either we found the insert point, or reached the end of the list... */
170 sfl_poller_init(newpl, agent, pdsi, magic, getCountersFn);
171 if(prev) prev->nxt = newpl;
172 else agent->pollers = newpl;
173 newpl->nxt = pl;
174 return newpl;
175 }
176
177 /*_________________--------------------------------__________________
178 _________________ sfl_agent_jumpTableAdd __________________
179 -----------------________________________________------------------
180 */
181
sfl_agent_jumpTableAdd(SFLAgent * agent,SFLSampler * sampler)182 static void sfl_agent_jumpTableAdd(SFLAgent *agent, SFLSampler *sampler)
183 {
184 uint32_t hashIndex = SFL_DS_INDEX(sampler->dsi) % SFL_HASHTABLE_SIZ;
185 sampler->hash_nxt = agent->jumpTable[hashIndex];
186 agent->jumpTable[hashIndex] = sampler;
187 }
188
189 /*_________________--------------------------------__________________
190 _________________ sfl_agent_jumpTableRemove __________________
191 -----------------________________________________------------------
192 */
193
sfl_agent_jumpTableRemove(SFLAgent * agent,SFLSampler * sampler)194 static void sfl_agent_jumpTableRemove(SFLAgent *agent, SFLSampler *sampler)
195 {
196 uint32_t hashIndex = SFL_DS_INDEX(sampler->dsi) % SFL_HASHTABLE_SIZ;
197 SFLSampler *search = agent->jumpTable[hashIndex], *prev = NULL;
198 for( ; search != NULL; prev = search, search = search->hash_nxt) if(search == sampler) break;
199 if(search) {
200 /* found - unlink */
201 if(prev) prev->hash_nxt = search->hash_nxt;
202 else agent->jumpTable[hashIndex] = search->hash_nxt;
203 search->hash_nxt = NULL;
204 }
205 }
206
207 /*_________________--------------------------------__________________
208 _________________ sfl_agent_getSamplerByIfIndex __________________
209 -----------------________________________________------------------
210 fast lookup (pointers cached in hash table). If there are multiple
211 sampler instances for a given ifIndex, then this fn will return
212 the one with the lowest instance number. Since the samplers
213 list is sorted, this means the other instances will be accesible
214 by following the sampler->nxt pointer (until the ds_class
215 or ds_index changes). This is helpful if you need to offer
216 the same flowSample to multiple samplers.
217 */
218
sfl_agent_getSamplerByIfIndex(SFLAgent * agent,uint32_t ifIndex)219 SFLSampler *sfl_agent_getSamplerByIfIndex(SFLAgent *agent, uint32_t ifIndex)
220 {
221 SFLSampler *search = agent->jumpTable[ifIndex % SFL_HASHTABLE_SIZ];
222 for( ; search != NULL; search = search->hash_nxt) if(SFL_DS_INDEX(search->dsi) == ifIndex) break;
223 return search;
224 }
225
226 /*_________________---------------------------__________________
227 _________________ sfl_agent_getSampler __________________
228 -----------------___________________________------------------
229 */
230
sfl_agent_getSampler(SFLAgent * agent,SFLDataSource_instance * pdsi)231 SFLSampler *sfl_agent_getSampler(SFLAgent *agent, SFLDataSource_instance *pdsi)
232 {
233 SFLSampler *sm;
234
235 /* find it and return it */
236 for( sm = agent->samplers; sm != NULL; sm = sm->nxt)
237 if(sfl_dsi_compare(pdsi, &sm->dsi) == 0) return sm;
238 /* not found */
239 return NULL;
240 }
241
242 /*_________________---------------------------__________________
243 _________________ sfl_agent_getPoller __________________
244 -----------------___________________________------------------
245 */
246
sfl_agent_getPoller(SFLAgent * agent,SFLDataSource_instance * pdsi)247 SFLPoller *sfl_agent_getPoller(SFLAgent *agent, SFLDataSource_instance *pdsi)
248 {
249 SFLPoller *pl;
250
251 /* find it and return it */
252 for( pl = agent->pollers; pl != NULL; pl = pl->nxt)
253 if(sfl_dsi_compare(pdsi, &pl->dsi) == 0) return pl;
254 /* not found */
255 return NULL;
256 }
257
258 /*_________________---------------------------__________________
259 _________________ sfl_agent_getReceiver __________________
260 -----------------___________________________------------------
261 */
262
sfl_agent_getReceiver(SFLAgent * agent,uint32_t receiverIndex)263 SFLReceiver *sfl_agent_getReceiver(SFLAgent *agent, uint32_t receiverIndex)
264 {
265 SFLReceiver *rcv;
266
267 uint32_t rcvIdx = 0;
268 for( rcv = agent->receivers; rcv != NULL; rcv = rcv->nxt)
269 if(receiverIndex == ++rcvIdx) return rcv;
270
271 /* not found - ran off the end of the table */
272 return NULL;
273 }
274
275 /*_________________---------------------------__________________
276 _________________ sfl_agent_getNextSampler __________________
277 -----------------___________________________------------------
278 */
279
sfl_agent_getNextSampler(SFLAgent * agent,SFLDataSource_instance * pdsi)280 SFLSampler *sfl_agent_getNextSampler(SFLAgent *agent, SFLDataSource_instance *pdsi)
281 {
282 /* return the one lexograpically just after it - assume they are sorted
283 correctly according to the lexographical ordering of the object ids */
284 SFLSampler *sm = sfl_agent_getSampler(agent, pdsi);
285 return sm ? sm->nxt : NULL;
286 }
287
288 /*_________________---------------------------__________________
289 _________________ sfl_agent_getNextPoller __________________
290 -----------------___________________________------------------
291 */
292
sfl_agent_getNextPoller(SFLAgent * agent,SFLDataSource_instance * pdsi)293 SFLPoller *sfl_agent_getNextPoller(SFLAgent *agent, SFLDataSource_instance *pdsi)
294 {
295 /* return the one lexograpically just after it - assume they are sorted
296 correctly according to the lexographical ordering of the object ids */
297 SFLPoller *pl = sfl_agent_getPoller(agent, pdsi);
298 return pl ? pl->nxt : NULL;
299 }
300
301 /*_________________---------------------------__________________
302 _________________ sfl_agent_getNextReceiver __________________
303 -----------------___________________________------------------
304 */
305
sfl_agent_getNextReceiver(SFLAgent * agent,uint32_t receiverIndex)306 SFLReceiver *sfl_agent_getNextReceiver(SFLAgent *agent, uint32_t receiverIndex)
307 {
308 return sfl_agent_getReceiver(agent, receiverIndex + 1);
309 }
310
311
312 /*_________________---------------------------__________________
313 _________________ sfl_agent_resetReceiver __________________
314 -----------------___________________________------------------
315 */
316
sfl_agent_resetReceiver(SFLAgent * agent,SFLReceiver * receiver)317 void sfl_agent_resetReceiver(SFLAgent *agent, SFLReceiver *receiver)
318 {
319 SFLReceiver *rcv;
320 SFLSampler *sm;
321 SFLPoller *pl;
322
323 /* tell samplers and pollers to stop sending to this receiver */
324 /* first get his receiverIndex */
325 uint32_t rcvIdx = 0;
326 for( rcv = agent->receivers; rcv != NULL; rcv = rcv->nxt) {
327 rcvIdx++; /* thanks to Diego Valverde for pointing out this bugfix */
328 if(rcv == receiver) {
329 /* now tell anyone that is using it to stop */
330 for( sm = agent->samplers; sm != NULL; sm = sm->nxt)
331 if(sfl_sampler_get_sFlowFsReceiver(sm) == rcvIdx) sfl_sampler_set_sFlowFsReceiver(sm, 0);
332
333 for( pl = agent->pollers; pl != NULL; pl = pl->nxt)
334 if(sfl_poller_get_sFlowCpReceiver(pl) == rcvIdx) sfl_poller_set_sFlowCpReceiver(pl, 0);
335
336 break;
337 }
338 }
339 }
340
341
342
343
344 /* ===================================================*/
345 /* ===================== SAMPLER =====================*/
346
347 /*_________________--------------------------__________________
348 _________________ sfl_sampler_init __________________
349 -----------------__________________________------------------
350 */
351
sfl_sampler_init(SFLSampler * sampler,SFLAgent * agent,SFLDataSource_instance * pdsi)352 void sfl_sampler_init(SFLSampler *sampler, SFLAgent *agent, SFLDataSource_instance *pdsi)
353 {
354 /* copy the dsi in case it points to sampler->dsi, which we are about to clear.
355 (Thanks to Jagjit Choudray of Force 10 Networks for pointing out this bug) */
356 SFLDataSource_instance dsi = *pdsi;
357
358 /* preserve the *nxt pointer too, in case we are resetting this poller and it is
359 already part of the agent's linked list (thanks to Matt Woodly for pointing this out) */
360 SFLSampler *nxtPtr = sampler->nxt;
361
362 /* clear everything */
363 memset(sampler, 0, sizeof(*sampler));
364
365 /* restore the linked list ptr */
366 sampler->nxt = nxtPtr;
367
368 /* now copy in the parameters */
369 sampler->agent = agent;
370 sampler->dsi = dsi;
371
372 /* set defaults */
373 sfl_sampler_set_sFlowFsMaximumHeaderSize(sampler, SFL_DEFAULT_HEADER_SIZE);
374 sfl_sampler_set_sFlowFsPacketSamplingRate(sampler, SFL_DEFAULT_SAMPLING_RATE);
375 }
376
377 /*_________________--------------------------__________________
378 _________________ reset __________________
379 -----------------__________________________------------------
380 */
381
resetSampler(SFLSampler * sampler)382 static void resetSampler(SFLSampler *sampler)
383 {
384 SFLDataSource_instance dsi = sampler->dsi;
385 sfl_sampler_init(sampler, sampler->agent, &dsi);
386 }
387
388 /*_________________---------------------------__________________
389 _________________ MIB access __________________
390 -----------------___________________________------------------
391 */
sfl_sampler_get_sFlowFsReceiver(SFLSampler * sampler)392 uint32_t sfl_sampler_get_sFlowFsReceiver(SFLSampler *sampler) {
393 return sampler->sFlowFsReceiver;
394 }
395
sfl_sampler_set_sFlowFsReceiver(SFLSampler * sampler,uint32_t sFlowFsReceiver)396 void sfl_sampler_set_sFlowFsReceiver(SFLSampler *sampler, uint32_t sFlowFsReceiver) {
397 sampler->sFlowFsReceiver = sFlowFsReceiver;
398 if(sFlowFsReceiver == 0) resetSampler(sampler);
399 else {
400 /* retrieve and cache a direct pointer to my receiver */
401 sampler->myReceiver = sfl_agent_getReceiver(sampler->agent, sampler->sFlowFsReceiver);
402 }
403 }
404
sfl_sampler_get_sFlowFsPacketSamplingRate(SFLSampler * sampler)405 uint32_t sfl_sampler_get_sFlowFsPacketSamplingRate(SFLSampler *sampler) {
406 return sampler->sFlowFsPacketSamplingRate;
407 }
408
sfl_sampler_set_sFlowFsPacketSamplingRate(SFLSampler * sampler,uint32_t sFlowFsPacketSamplingRate)409 void sfl_sampler_set_sFlowFsPacketSamplingRate(SFLSampler *sampler, uint32_t sFlowFsPacketSamplingRate) {
410 sampler->sFlowFsPacketSamplingRate = sFlowFsPacketSamplingRate;
411 /* initialize the skip count too */
412 sampler->skip = sFlowFsPacketSamplingRate ? sfl_random(sFlowFsPacketSamplingRate) : 0;
413 }
414
sfl_sampler_get_sFlowFsMaximumHeaderSize(SFLSampler * sampler)415 uint32_t sfl_sampler_get_sFlowFsMaximumHeaderSize(SFLSampler *sampler) {
416 return sampler->sFlowFsMaximumHeaderSize;
417 }
418
sfl_sampler_set_sFlowFsMaximumHeaderSize(SFLSampler * sampler,uint32_t sFlowFsMaximumHeaderSize)419 void sfl_sampler_set_sFlowFsMaximumHeaderSize(SFLSampler *sampler, uint32_t sFlowFsMaximumHeaderSize) {
420 sampler->sFlowFsMaximumHeaderSize = sFlowFsMaximumHeaderSize;
421 }
422
423 /* call this to set a maximum samples-per-second threshold. If the sampler reaches this
424 threshold it will automatically back off the sampling rate. A value of 0 disables the
425 mechanism */
426
sfl_sampler_set_backoffThreshold(SFLSampler * sampler,uint32_t samplesPerSecond)427 void sfl_sampler_set_backoffThreshold(SFLSampler *sampler, uint32_t samplesPerSecond) {
428 sampler->backoffThreshold = samplesPerSecond;
429 }
430
sfl_sampler_get_backoffThreshold(SFLSampler * sampler)431 uint32_t sfl_sampler_get_backoffThreshold(SFLSampler *sampler) {
432 return sampler->backoffThreshold;
433 }
434
sfl_sampler_get_samplesLastTick(SFLSampler * sampler)435 uint32_t sfl_sampler_get_samplesLastTick(SFLSampler *sampler) {
436 return sampler->samplesLastTick;
437 }
438
439 /*_________________---------------------------------__________________
440 _________________ sequence number reset __________________
441 -----------------_________________________________------------------
442 Used by the agent to indicate a samplePool discontinuity
443 so that the sflow collector will know to ignore the next delta.
444 */
sfl_sampler_resetFlowSeqNo(SFLSampler * sampler)445 void sfl_sampler_resetFlowSeqNo(SFLSampler *sampler) { sampler->flowSampleSeqNo = 0; }
446
447
448 /*_________________---------------------------__________________
449 _________________ sfl_sampler_tick __________________
450 -----------------___________________________------------------
451 */
452
sfl_sampler_tick(SFLSampler * sampler,time_t now)453 void sfl_sampler_tick(SFLSampler *sampler, time_t now)
454 {
455 if(sampler->backoffThreshold && sampler->samplesThisTick > sampler->backoffThreshold) {
456 /* automatic backoff. If using hardware sampling then this is where you have to */
457 /* call out to change the sampling rate and make sure that any other registers/variables */
458 /* that hold this value are updated. */
459 sampler->sFlowFsPacketSamplingRate *= 2;
460 }
461 sampler->samplesLastTick = sampler->samplesThisTick;
462 sampler->samplesThisTick = 0;
463 }
464
465
466
467 /*_________________------------------------------__________________
468 _________________ sfl_sampler_writeFlowSample __________________
469 -----------------______________________________------------------
470 */
471
sfl_sampler_writeFlowSample(SFLSampler * sampler,SFL_FLOW_SAMPLE_TYPE * fs)472 void sfl_sampler_writeFlowSample(SFLSampler *sampler, SFL_FLOW_SAMPLE_TYPE *fs)
473 {
474 if(fs == NULL) return;
475 sampler->samplesThisTick++;
476 /* increment the sequence number */
477 fs->sequence_number = ++sampler->flowSampleSeqNo;
478 /* copy the other header fields in */
479 #ifdef SFL_USE_32BIT_INDEX
480 fs->ds_class = SFL_DS_CLASS(sampler->dsi);
481 fs->ds_index = SFL_DS_INDEX(sampler->dsi);
482 #else
483 fs->source_id = SFL_DS_DATASOURCE(sampler->dsi);
484 #endif
485 /* the sampling rate may have been set already. */
486 if(fs->sampling_rate == 0) fs->sampling_rate = sampler->sFlowFsPacketSamplingRate;
487 /* the samplePool may be maintained upstream too. */
488 if(fs->sample_pool == 0) fs->sample_pool = sampler->samplePool;
489 /* and the same for the drop event counter */
490 if(fs->drops == 0) fs->drops = sampler->dropEvents;
491 /* sent to my receiver */
492 if(sampler->myReceiver) sfl_receiver_writeFlowSample(sampler->myReceiver, fs);
493 }
494
495 /*_________________-------------------------------------__________________
496 _________________ sfl_sampler_writeEncodedFlowSample __________________
497 -----------------_____________________________________------------------
498 */
499
sfl_sampler_writeEncodedFlowSample(SFLSampler * sampler,char * xdrBytes,uint32_t len)500 void sfl_sampler_writeEncodedFlowSample(SFLSampler *sampler, char *xdrBytes, uint32_t len)
501 {
502 SFL_FLOW_SAMPLE_TYPE fs;
503 memset(&fs, 0, sizeof(fs));
504 sampler->samplesThisTick++;
505 /* increment the sequence number */
506 fs.sequence_number = ++sampler->flowSampleSeqNo;
507 /* copy the other header fields in */
508 #ifdef SFL_USE_32BIT_INDEX
509 fs.ds_class = SFL_DS_CLASS(sampler->dsi);
510 fs.ds_index = SFL_DS_INDEX(sampler->dsi);
511 #else
512 fs.source_id = SFL_DS_DATASOURCE(sampler->dsi);
513 #endif
514 fs.sampling_rate = sampler->sFlowFsPacketSamplingRate;
515 fs.sample_pool = sampler->samplePool;
516 fs.drops = sampler->dropEvents;
517 if(sampler->myReceiver) sfl_receiver_writeEncodedFlowSample(sampler->myReceiver, &fs, xdrBytes, len);
518 }
519
520 /*_________________---------------------------__________________
521 _________________ sfl_random __________________
522 -----------------___________________________------------------
523 Gerhard's generator
524 */
525
526 static uint32_t SFLRandom = 1;
527
sfl_random(uint32_t lim)528 uint32_t sfl_random(uint32_t lim) {
529 SFLRandom = ((SFLRandom * 32719) + 3) % 32749;
530 return ((SFLRandom % lim) + 1);
531 }
532
sfl_random_init(uint32_t seed)533 void sfl_random_init(uint32_t seed) {
534 SFLRandom = seed;
535 }
536
sfl_sampler_next_skip(SFLSampler * sampler)537 uint32_t sfl_sampler_next_skip(SFLSampler *sampler) {
538 return sfl_random((2 * sampler->sFlowFsPacketSamplingRate) - 1);
539 }
540
541 /*_________________---------------------------__________________
542 _________________ sfl_sampler_takeSample __________________
543 -----------------___________________________------------------
544 */
545
sfl_sampler_takeSample(SFLSampler * sampler)546 int sfl_sampler_takeSample(SFLSampler *sampler)
547 {
548 /* increment the samplePool */
549 sampler->samplePool++;
550
551 if(unlikely(--sampler->skip == 0)) {
552 /* reached zero. Set the next skip and return true. */
553 sampler->skip = sfl_sampler_next_skip(sampler);
554 return 1;
555 }
556 return 0;
557 }
558
559
560
561 /* ===================================================*/
562 /* ===================== POLLER ======================*/
563
564 /*_________________--------------------------__________________
565 _________________ sfl_poller_init __________________
566 -----------------__________________________------------------
567 */
568
sfl_poller_init(SFLPoller * poller,SFLAgent * agent,SFLDataSource_instance * pdsi,void * magic,getCountersFn_t getCountersFn)569 void sfl_poller_init(SFLPoller *poller,
570 SFLAgent *agent,
571 SFLDataSource_instance *pdsi,
572 void *magic, /* ptr to pass back in getCountersFn() */
573 getCountersFn_t getCountersFn)
574 {
575 /* copy the dsi in case it points to poller->dsi, which we are about to clear */
576 SFLDataSource_instance dsi = *pdsi;
577
578 /* preserve the *nxt pointer too, in case we are resetting this poller and it is
579 already part of the agent's linked list (thanks to Matt Woodly for pointing this out) */
580 SFLPoller *nxtPtr = poller->nxt;
581
582 /* clear everything */
583 memset(poller, 0, sizeof(*poller));
584
585 /* restore the linked list ptr */
586 poller->nxt = nxtPtr;
587
588 /* now copy in the parameters */
589 poller->agent = agent;
590 poller->dsi = dsi; /* structure copy */
591 poller->magic = magic;
592 poller->getCountersFn = getCountersFn;
593 }
594
595 /*_________________--------------------------__________________
596 _________________ reset __________________
597 -----------------__________________________------------------
598 */
599
resetPoller(SFLPoller * poller)600 static void resetPoller(SFLPoller *poller)
601 {
602 SFLDataSource_instance dsi = poller->dsi;
603 sfl_poller_init(poller, poller->agent, &dsi, poller->magic, poller->getCountersFn);
604 }
605
606 /*_________________---------------------------__________________
607 _________________ MIB access __________________
608 -----------------___________________________------------------
609 */
sfl_poller_get_sFlowCpReceiver(SFLPoller * poller)610 uint32_t sfl_poller_get_sFlowCpReceiver(SFLPoller *poller) {
611 return poller->sFlowCpReceiver;
612 }
613
sfl_poller_set_sFlowCpReceiver(SFLPoller * poller,uint32_t sFlowCpReceiver)614 void sfl_poller_set_sFlowCpReceiver(SFLPoller *poller, uint32_t sFlowCpReceiver) {
615 poller->sFlowCpReceiver = sFlowCpReceiver;
616 if(sFlowCpReceiver == 0) resetPoller(poller);
617 else {
618 /* retrieve and cache a direct pointer to my receiver */
619 poller->myReceiver = sfl_agent_getReceiver(poller->agent, poller->sFlowCpReceiver);
620 }
621 }
622
sfl_poller_get_sFlowCpInterval(SFLPoller * poller)623 uint32_t sfl_poller_get_sFlowCpInterval(SFLPoller *poller) {
624 return (uint32_t)poller->sFlowCpInterval;
625 }
626
sfl_poller_set_sFlowCpInterval(SFLPoller * poller,uint32_t sFlowCpInterval)627 void sfl_poller_set_sFlowCpInterval(SFLPoller *poller, uint32_t sFlowCpInterval) {
628 poller->sFlowCpInterval = sFlowCpInterval;
629 /* Set the countersCountdown to be a randomly selected value between 1 and
630 sFlowCpInterval. That way the counter polling would be desynchronised
631 (on a 200-port switch, polling all the counters in one second could be harmful). */
632 poller->countersCountdown = sFlowCpInterval ? sfl_random(sFlowCpInterval) : 0;
633 }
634
635 /*_________________---------------------------------__________________
636 _________________ sequence number reset __________________
637 -----------------_________________________________------------------
638 Used to indicate a counter discontinuity
639 so that the sflow collector will know to ignore the next delta.
640 */
sfl_poller_resetCountersSeqNo(SFLPoller * poller)641 void sfl_poller_resetCountersSeqNo(SFLPoller *poller) { poller->countersSampleSeqNo = 0; }
642
643 /*_________________---------------------------__________________
644 _________________ sfl_poller_tick __________________
645 -----------------___________________________------------------
646 */
647
sfl_poller_tick(SFLPoller * poller,time_t now)648 void sfl_poller_tick(SFLPoller *poller, time_t now)
649 {
650 if(poller->countersCountdown == 0) return; /* counters retrieval was not enabled */
651 if(poller->sFlowCpReceiver == 0) return;
652
653 if(--poller->countersCountdown == 0) {
654 if(poller->getCountersFn != NULL) {
655 /* call out for counters */
656 SFL_COUNTERS_SAMPLE_TYPE cs;
657 memset(&cs, 0, sizeof(cs));
658 poller->getCountersFn(poller->magic, poller, &cs);
659 /* this countersFn is expected to fill in some counter block elements */
660 /* and then call sfl_poller_writeCountersSample(poller, &cs); */
661 }
662 /* reset the countdown */
663 poller->countersCountdown = poller->sFlowCpInterval;
664 }
665 }
666
667 /*_________________---------------------------------__________________
668 _________________ sfl_poller_writeCountersSample __________________
669 -----------------_________________________________------------------
670 */
671
sfl_poller_writeCountersSample(SFLPoller * poller,SFL_COUNTERS_SAMPLE_TYPE * cs)672 void sfl_poller_writeCountersSample(SFLPoller *poller, SFL_COUNTERS_SAMPLE_TYPE *cs)
673 {
674 /* fill in the rest of the header fields, and send to the receiver */
675 cs->sequence_number = ++poller->countersSampleSeqNo;
676 #ifdef SFL_USE_32BIT_INDEX
677 cs->ds_class = SFL_DS_CLASS(poller->dsi);
678 cs->ds_index = SFL_DS_INDEX(poller->dsi);
679 #else
680 cs->source_id = SFL_DS_DATASOURCE(poller->dsi);
681 #endif
682 /* sent to my receiver */
683 if(poller->myReceiver) sfl_receiver_writeCountersSample(poller->myReceiver, cs);
684 }
685
686
687
688
689
690 /* ===================================================*/
691 /* ===================== RECEIVER ====================*/
692
693 static void resetSampleCollector(SFLReceiver *receiver);
694 static void sendSample(SFLReceiver *receiver);
695 static void receiverError(SFLReceiver *receiver, char *errm);
696 static void putNet32(SFLReceiver *receiver, uint32_t val);
697 static void putAddress(SFLReceiver *receiver, SFLAddress *addr);
698 static void putOpaque(SFLReceiver *receiver, char *val, int len);
699
700 /*_________________--------------------------__________________
701 _________________ sfl_receiver_init __________________
702 -----------------__________________________------------------
703 */
704
sfl_receiver_init(SFLReceiver * receiver,SFLAgent * agent)705 void sfl_receiver_init(SFLReceiver *receiver, SFLAgent *agent)
706 {
707 /* first clear everything */
708 memset(receiver, 0, sizeof(*receiver));
709
710 /* now copy in the parameters */
711 receiver->agent = agent;
712
713 /* set defaults */
714 receiver->sFlowRcvrMaximumDatagramSize = SFL_DEFAULT_DATAGRAM_SIZE;
715 receiver->sFlowRcvrPort = SFL_DEFAULT_COLLECTOR_PORT;
716
717 /* prepare to receive the first sample */
718 resetSampleCollector(receiver);
719 }
720
721 /*_________________---------------------------__________________
722 _________________ reset __________________
723 -----------------___________________________------------------
724
725 called on timeout, or when owner string is cleared
726 */
727
resetReceiver(SFLReceiver * receiver)728 static void resetReceiver(SFLReceiver *receiver) {
729 /* ask agent to tell samplers and pollers to stop sending samples */
730 sfl_agent_resetReceiver(receiver->agent, receiver);
731 /* reinitialize */
732 sfl_receiver_init(receiver, receiver->agent);
733 }
734
735
736 /*_________________----------------------------------------_____________
737 _________________ MIB Vars _____________
738 -----------------________________________________________-------------
739 */
740
sfl_receiver_get_sFlowRcvrOwner(SFLReceiver * receiver)741 char * sfl_receiver_get_sFlowRcvrOwner(SFLReceiver *receiver) {
742 return receiver->sFlowRcvrOwner;
743 }
sfl_receiver_set_sFlowRcvrOwner(SFLReceiver * receiver,char * sFlowRcvrOwner)744 void sfl_receiver_set_sFlowRcvrOwner(SFLReceiver *receiver, char *sFlowRcvrOwner) {
745 receiver->sFlowRcvrOwner = sFlowRcvrOwner;
746 if(sFlowRcvrOwner == NULL || sFlowRcvrOwner[0] == '\0') {
747 /* reset condition! owner string was cleared */
748 resetReceiver(receiver);
749 }
750 }
sfl_receiver_get_sFlowRcvrTimeout(SFLReceiver * receiver)751 time_t sfl_receiver_get_sFlowRcvrTimeout(SFLReceiver *receiver) {
752 return receiver->sFlowRcvrTimeout;
753 }
sfl_receiver_set_sFlowRcvrTimeout(SFLReceiver * receiver,time_t sFlowRcvrTimeout)754 void sfl_receiver_set_sFlowRcvrTimeout(SFLReceiver *receiver, time_t sFlowRcvrTimeout) {
755 receiver->sFlowRcvrTimeout =sFlowRcvrTimeout;
756 }
sfl_receiver_get_sFlowRcvrMaximumDatagramSize(SFLReceiver * receiver)757 uint32_t sfl_receiver_get_sFlowRcvrMaximumDatagramSize(SFLReceiver *receiver) {
758 return receiver->sFlowRcvrMaximumDatagramSize;
759 }
sfl_receiver_set_sFlowRcvrMaximumDatagramSize(SFLReceiver * receiver,uint32_t sFlowRcvrMaximumDatagramSize)760 void sfl_receiver_set_sFlowRcvrMaximumDatagramSize(SFLReceiver *receiver, uint32_t sFlowRcvrMaximumDatagramSize) {
761 uint32_t mdz = sFlowRcvrMaximumDatagramSize;
762 if(mdz < SFL_MIN_DATAGRAM_SIZE) mdz = SFL_MIN_DATAGRAM_SIZE;
763 receiver->sFlowRcvrMaximumDatagramSize = mdz;
764 }
sfl_receiver_get_sFlowRcvrAddress(SFLReceiver * receiver)765 SFLAddress *sfl_receiver_get_sFlowRcvrAddress(SFLReceiver *receiver) {
766 return &receiver->sFlowRcvrAddress;
767 }
sfl_receiver_set_sFlowRcvrAddress(SFLReceiver * receiver,SFLAddress * sFlowRcvrAddress)768 void sfl_receiver_set_sFlowRcvrAddress(SFLReceiver *receiver, SFLAddress *sFlowRcvrAddress) {
769 if(sFlowRcvrAddress) receiver->sFlowRcvrAddress = *sFlowRcvrAddress; /* structure copy */
770 }
sfl_receiver_get_sFlowRcvrPort(SFLReceiver * receiver)771 uint32_t sfl_receiver_get_sFlowRcvrPort(SFLReceiver *receiver) {
772 return receiver->sFlowRcvrPort;
773 }
sfl_receiver_set_sFlowRcvrPort(SFLReceiver * receiver,uint32_t sFlowRcvrPort)774 void sfl_receiver_set_sFlowRcvrPort(SFLReceiver *receiver, uint32_t sFlowRcvrPort) {
775 receiver->sFlowRcvrPort = sFlowRcvrPort;
776 }
777
778 /*_________________---------------------------__________________
779 _________________ sfl_receiver_tick __________________
780 -----------------___________________________------------------
781 */
782
sfl_receiver_tick(SFLReceiver * receiver,time_t now)783 void sfl_receiver_tick(SFLReceiver *receiver, time_t now)
784 {
785 /* if there are any samples to send, flush them now */
786 if(receiver->sampleCollector.numSamples > 0) sendSample(receiver);
787 /* check the timeout */
788 if(receiver->sFlowRcvrTimeout && (uint32_t)receiver->sFlowRcvrTimeout != 0xFFFFFFFF) {
789 /* count down one tick and reset if we reach 0 */
790 if(--receiver->sFlowRcvrTimeout == 0) resetReceiver(receiver);
791 }
792 }
793
794 /*_________________-----------------------------__________________
795 _________________ receiver write utilities __________________
796 -----------------_____________________________------------------
797 */
798
put32(SFLReceiver * receiver,uint32_t val)799 static void put32(SFLReceiver *receiver, uint32_t val)
800 {
801 *receiver->sampleCollector.datap++ = val;
802 }
803
putNet32(SFLReceiver * receiver,uint32_t val)804 static void putNet32(SFLReceiver *receiver, uint32_t val)
805 {
806 *receiver->sampleCollector.datap++ = htonl(val);
807 }
808
putNet64(SFLReceiver * receiver,uint64_t val64)809 static void putNet64(SFLReceiver *receiver, uint64_t val64)
810 {
811 uint32_t *firstQuadPtr = receiver->sampleCollector.datap;
812 /* first copy the bytes in */
813 memcpy((byte_t *)firstQuadPtr, &val64, 8);
814 if(htonl(1) != 1) {
815 /* swap the bytes, and reverse the quads too */
816 uint32_t tmp = *receiver->sampleCollector.datap++;
817 *firstQuadPtr = htonl(*receiver->sampleCollector.datap);
818 *receiver->sampleCollector.datap++ = htonl(tmp);
819 }
820 else receiver->sampleCollector.datap += 2;
821 }
822
put128(SFLReceiver * receiver,byte_t * val)823 static void put128(SFLReceiver *receiver, byte_t *val)
824 {
825 memcpy(receiver->sampleCollector.datap, val, 16);
826 receiver->sampleCollector.datap += 4;
827 }
828
putString(SFLReceiver * receiver,SFLString * s)829 static void putString(SFLReceiver *receiver, SFLString *s)
830 {
831 putNet32(receiver, s->len);
832 memcpy(receiver->sampleCollector.datap, s->str, s->len);
833 receiver->sampleCollector.datap += (s->len + 3) / 4; /* pad to 4-byte boundary */
834 }
835
stringEncodingLength(SFLString * s)836 static uint32_t stringEncodingLength(SFLString *s) {
837 /* answer in bytes, so remember to mulitply by 4 after rounding up to nearest 4-byte boundary */
838 return 4 + (((s->len + 3) / 4) * 4);
839 }
840
putAddress(SFLReceiver * receiver,SFLAddress * addr)841 static void putAddress(SFLReceiver *receiver, SFLAddress *addr)
842 {
843 /* encode unspecified addresses as IPV4:0.0.0.0 - or should we flag this as an error? */
844 if(addr->type == 0) {
845 putNet32(receiver, SFLADDRESSTYPE_IP_V4);
846 put32(receiver, 0);
847 }
848 else {
849 putNet32(receiver, addr->type);
850 if(addr->type == SFLADDRESSTYPE_IP_V4) put32(receiver, addr->address.ip_v4.addr);
851 else put128(receiver, addr->address.ip_v6.addr);
852 }
853 }
854
putOpaque(SFLReceiver * receiver,char * val,int len)855 static void putOpaque(SFLReceiver *receiver, char *val, int len)
856 {
857 memcpy((char *)receiver->sampleCollector.datap, val, len);
858 receiver->sampleCollector.datap += ((len+3)/4);
859 }
860
httpOpEncodingLength(SFLSampled_http * op)861 static uint32_t httpOpEncodingLength(SFLSampled_http *op) {
862 uint32_t elemSiz = stringEncodingLength(&op->uri);
863 elemSiz += stringEncodingLength(&op->host);
864 elemSiz += stringEncodingLength(&op->referrer);
865 elemSiz += stringEncodingLength(&op->useragent);
866 elemSiz += stringEncodingLength(&op->xff);
867 elemSiz += stringEncodingLength(&op->authuser);
868 elemSiz += stringEncodingLength(&op->mimetype);
869 elemSiz += 32; /* method, protocol, req_bytes, resp_bytes, uS, status */
870 return elemSiz;
871 }
872
putSocket4(SFLReceiver * receiver,SFLExtended_socket_ipv4 * socket4)873 static void putSocket4(SFLReceiver *receiver, SFLExtended_socket_ipv4 *socket4) {
874 putNet32(receiver, socket4->protocol);
875 put32(receiver, socket4->local_ip.addr);
876 put32(receiver, socket4->remote_ip.addr);
877 putNet32(receiver, socket4->local_port);
878 putNet32(receiver, socket4->remote_port);
879 }
880
putSocket6(SFLReceiver * receiver,SFLExtended_socket_ipv6 * socket6)881 static void putSocket6(SFLReceiver *receiver, SFLExtended_socket_ipv6 *socket6) {
882 putNet32(receiver, socket6->protocol);
883 put128(receiver, socket6->local_ip.addr);
884 put128(receiver, socket6->remote_ip.addr);
885 putNet32(receiver, socket6->local_port);
886 putNet32(receiver, socket6->remote_port);
887 }
888
889
890 /*_________________-----------------------------__________________
891 _________________ computeFlowSampleSize __________________
892 -----------------_____________________________------------------
893 */
894
computeFlowSampleSize(SFLReceiver * receiver,SFL_FLOW_SAMPLE_TYPE * fs)895 static int computeFlowSampleSize(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs)
896 {
897 SFLFlow_sample_element *elem;
898 uint32_t elemSiz;
899 #ifdef SFL_USE_32BIT_INDEX
900 uint siz = 52; /* tag, length, sequence_number, ds_class, ds_index, sampling_rate,
901 sample_pool, drops, inputFormat, input, outputFormat, output, number of elements */
902 #else
903 uint32_t siz = 40; /* tag, length, sequence_number, source_id, sampling_rate,
904 sample_pool, drops, input, output, number of elements */
905 #endif
906
907 /* hard code the wire-encoding sizes, in case the structures are expanded to be 64-bit aligned */
908
909 fs->num_elements = 0; /* we're going to count them again even if this was set by the client */
910 for(elem = fs->elements; elem != NULL; elem = elem->nxt) {
911 fs->num_elements++;
912 siz += 8; /* tag, length */
913 elemSiz = 0;
914 switch(elem->tag) {
915 case SFLFLOW_HTTP: elemSiz = httpOpEncodingLength(&elem->flowType.http); break;
916 case SFLFLOW_EX_SOCKET4: elemSiz = XDRSIZ_SFLEXTENDED_SOCKET4; break;
917 case SFLFLOW_EX_SOCKET6: elemSiz = XDRSIZ_SFLEXTENDED_SOCKET6; break;
918 default:
919 {
920 char errm[MAX_ERRMSG_LEN];
921 snprintf(errm, MAX_ERRMSG_LEN, "computeFlowSampleSize(): unexpected tag (%ud)", elem->tag);
922 receiverError(receiver, errm);
923 return -1;
924 }
925 break;
926 }
927 /* cache the element size, and accumulate it into the overall FlowSample size */
928 elem->length = elemSiz;
929 siz += elemSiz;
930 }
931
932 return siz;
933 }
934
935 /*_________________-------------------------------__________________
936 _________________ sfl_receiver_writeFlowSample __________________
937 -----------------_______________________________------------------
938 */
939
sfl_receiver_writeFlowSample(SFLReceiver * receiver,SFL_FLOW_SAMPLE_TYPE * fs)940 int sfl_receiver_writeFlowSample(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs)
941 {
942 int packedSize;
943 SFLFlow_sample_element *elem;
944 uint32_t encodingSize;
945
946 if(fs == NULL) return -1;
947 if((packedSize = computeFlowSampleSize(receiver, fs)) == -1) return -1;
948
949 /* check in case this one sample alone is too big for the datagram */
950 if(packedSize > (int)(receiver->sFlowRcvrMaximumDatagramSize)) {
951 receiverError(receiver, "flow sample too big for datagram");
952 return -1;
953 }
954
955 /* if the sample pkt is full enough so that this sample might put */
956 /* it over the limit, then we should send it now before going on. */
957 if((receiver->sampleCollector.pktlen + packedSize) >= receiver->sFlowRcvrMaximumDatagramSize)
958 sendSample(receiver);
959
960 receiver->sampleCollector.numSamples++;
961
962 #ifdef SFL_USE_32BIT_INDEX
963 putNet32(receiver, SFLFLOW_SAMPLE_EXPANDED);
964 #else
965 putNet32(receiver, SFLFLOW_SAMPLE);
966 #endif
967
968 putNet32(receiver, packedSize - 8); /* don't include tag and len */
969 putNet32(receiver, fs->sequence_number);
970
971 #ifdef SFL_USE_32BIT_INDEX
972 putNet32(receiver, fs->ds_class);
973 putNet32(receiver, fs->ds_index);
974 #else
975 putNet32(receiver, fs->source_id);
976 #endif
977
978 putNet32(receiver, fs->sampling_rate);
979 putNet32(receiver, fs->sample_pool);
980 putNet32(receiver, fs->drops);
981
982 #ifdef SFL_USE_32BIT_INDEX
983 putNet32(receiver, fs->inputFormat);
984 putNet32(receiver, fs->input);
985 putNet32(receiver, fs->outputFormat);
986 putNet32(receiver, fs->output);
987 #else
988 putNet32(receiver, fs->input);
989 putNet32(receiver, fs->output);
990 #endif
991
992 putNet32(receiver, fs->num_elements);
993
994 for(elem = fs->elements; elem != NULL; elem = elem->nxt) {
995
996 putNet32(receiver, elem->tag);
997 putNet32(receiver, elem->length); /* length cached in computeFlowSampleSize() */
998
999 switch(elem->tag) {
1000 case SFLFLOW_EX_SOCKET4: putSocket4(receiver, &elem->flowType.socket4); break;
1001 case SFLFLOW_EX_SOCKET6: putSocket6(receiver, &elem->flowType.socket6); break;
1002 case SFLFLOW_HTTP:
1003 putNet32(receiver, elem->flowType.http.method);
1004 putNet32(receiver, elem->flowType.http.protocol);
1005 putString(receiver, &elem->flowType.http.uri);
1006 putString(receiver, &elem->flowType.http.host);
1007 putString(receiver, &elem->flowType.http.referrer);
1008 putString(receiver, &elem->flowType.http.useragent);
1009 putString(receiver, &elem->flowType.http.xff);
1010 putString(receiver, &elem->flowType.http.authuser);
1011 putString(receiver, &elem->flowType.http.mimetype);
1012 putNet64(receiver, elem->flowType.http.req_bytes);
1013 putNet64(receiver, elem->flowType.http.resp_bytes);
1014 putNet32(receiver, elem->flowType.http.uS);
1015 putNet32(receiver, elem->flowType.http.status);
1016 break;
1017 default:
1018 {
1019 char errm[MAX_ERRMSG_LEN];
1020 snprintf(errm, MAX_ERRMSG_LEN, "sfl_receiver_writeFlowSample: unexpected tag (%ud)", elem->tag);
1021 receiverError(receiver, errm);
1022 return -1;
1023 }
1024 break;
1025 }
1026 }
1027
1028 /* sanity check */
1029 encodingSize = (byte_t *)receiver->sampleCollector.datap
1030 - (byte_t *)receiver->sampleCollector.data
1031 - receiver->sampleCollector.pktlen;
1032
1033 if(encodingSize != (uint32_t)packedSize) {
1034 char errm[MAX_ERRMSG_LEN];
1035 snprintf(errm, MAX_ERRMSG_LEN, "sfl_receiver_writeFlowSample: encoding_size(%ud) != expected_size(%ud)",
1036 encodingSize,
1037 packedSize);
1038 receiverError(receiver, errm);
1039 return -1;
1040 }
1041
1042 /* update the pktlen */
1043 receiver->sampleCollector.pktlen = (byte_t *)receiver->sampleCollector.datap - (byte_t *)receiver->sampleCollector.data;
1044 return packedSize;
1045 }
1046
1047 /*_________________--------------------------------------__________________
1048 _________________ sfl_receiver_writeEncodedFlowSample __________________
1049 -----------------______________________________________------------------
1050 */
1051
sfl_receiver_writeEncodedFlowSample(SFLReceiver * receiver,SFL_FLOW_SAMPLE_TYPE * fs,char * xdrBytes,uint32_t packedSize)1052 int sfl_receiver_writeEncodedFlowSample(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs, char *xdrBytes, uint32_t packedSize)
1053 {
1054 uint32_t encodingSize;
1055 uint32_t overrideEncodingSize;
1056 uint32_t xdrHdrStrip;
1057
1058 /* check in case this one sample alone is too big for the datagram */
1059 if(packedSize > receiver->sFlowRcvrMaximumDatagramSize) {
1060 receiverError(receiver, "flow sample too big for datagram");
1061 return -1;
1062 }
1063
1064 /* if the sample pkt is full enough so that this sample might put */
1065 /* it over the limit, then we should send it now before going on. */
1066 if((receiver->sampleCollector.pktlen + packedSize) >= receiver->sFlowRcvrMaximumDatagramSize)
1067 sendSample(receiver);
1068
1069 receiver->sampleCollector.numSamples++;
1070
1071 #ifdef SFL_USE_32BIT_INDEX
1072 putNet32(receiver, SFLFLOW_SAMPLE_EXPANDED);
1073 #else
1074 putNet32(receiver, SFLFLOW_SAMPLE);
1075 #endif
1076
1077 putNet32(receiver, packedSize - 8); /* don't include tag and len bytes in the length */
1078 putNet32(receiver, fs->sequence_number);
1079
1080 #ifdef SFL_USE_32BIT_INDEX
1081 putNet32(receiver, fs->ds_class);
1082 putNet32(receiver, fs->ds_index);
1083 #else
1084 putNet32(receiver, fs->source_id);
1085 #endif
1086
1087 putNet32(receiver, fs->sampling_rate);
1088 putNet32(receiver, fs->sample_pool);
1089 putNet32(receiver, fs->drops);
1090
1091 /* sanity check */
1092 overrideEncodingSize = (byte_t *)receiver->sampleCollector.datap
1093 - (byte_t *)receiver->sampleCollector.data
1094 - receiver->sampleCollector.pktlen;
1095
1096 #ifdef SFL_USE_32BIT_INDEX
1097 xdrHdrStrip = 32; /* tag, length, sequence_number, ds_class, ds_index, sampling_rate,
1098 sample_pool, drops, [inputFormat, input, outputFormat, output, number of elements...] */
1099 #else
1100 xdrHdrStrip = 28; /* tag, length, sequence_number, source_id, sampling_rate,
1101 sample_pool, drops, [input, output, number of elements...] */
1102 #endif
1103
1104 memcpy(receiver->sampleCollector.datap, xdrBytes + xdrHdrStrip, packedSize - xdrHdrStrip);
1105 receiver->sampleCollector.datap += ((packedSize - xdrHdrStrip) >> 2);
1106
1107 /* sanity check */
1108 encodingSize = (byte_t *)receiver->sampleCollector.datap
1109 - (byte_t *)receiver->sampleCollector.data
1110 - receiver->sampleCollector.pktlen;
1111
1112 if(encodingSize != (uint32_t)packedSize) {
1113 char errm[MAX_ERRMSG_LEN];
1114 snprintf(errm, MAX_ERRMSG_LEN, "sfl_receiver_writeEncodedFlowSample: encoding_size(%ud) != expected_size(%ud) [overrideEncodingSize=%ud xdrHeaderStrip=%ud pktlen=%ud]",
1115 encodingSize,
1116 packedSize,
1117 overrideEncodingSize,
1118 xdrHdrStrip,
1119 receiver->sampleCollector.pktlen);
1120 receiverError(receiver, errm);
1121 return -1;
1122 }
1123
1124 /* update the pktlen */
1125 receiver->sampleCollector.pktlen = (byte_t *)receiver->sampleCollector.datap - (byte_t *)receiver->sampleCollector.data;
1126 return packedSize;
1127 }
1128
1129 /*_________________-----------------------------__________________
1130 _________________ computeCountersSampleSize __________________
1131 -----------------_____________________________------------------
1132 */
1133
computeCountersSampleSize(SFLReceiver * receiver,SFL_COUNTERS_SAMPLE_TYPE * cs)1134 static int computeCountersSampleSize(SFLReceiver *receiver, SFL_COUNTERS_SAMPLE_TYPE *cs)
1135 {
1136 SFLCounters_sample_element *elem;
1137 uint32_t elemSiz;
1138
1139 #ifdef SFL_USE_32BIT_INDEX
1140 uint siz = 24; /* tag, length, sequence_number, ds_class, ds_index, number of elements */
1141 #else
1142 uint32_t siz = 20; /* tag, length, sequence_number, source_id, number of elements */
1143 #endif
1144
1145 cs->num_elements = 0; /* we're going to count them again even if this was set by the client */
1146 for( elem = cs->elements; elem != NULL; elem = elem->nxt) {
1147 cs->num_elements++;
1148 siz += 8; /* tag, length */
1149 elemSiz = 0;
1150
1151 /* hard code the wire-encoding sizes rather than use sizeof() -- in case the
1152 structures are expanded to be 64-bit aligned */
1153
1154 switch(elem->tag) {
1155 case SFLCOUNTERS_HOST_PAR: elemSiz = 8 /*sizeof(elem->counterBlock.host_par)*/; break;
1156 case SFLCOUNTERS_HTTP: elemSiz = XDRSIZ_SFLHTTP_COUNTERS /*sizeof(elem->counterBlock.http)*/; break;
1157 default:
1158 {
1159 char errm[MAX_ERRMSG_LEN];
1160 snprintf(errm, MAX_ERRMSG_LEN, "computeCounterSampleSize(): unexpected counters tag (%ud)", elem->tag);
1161 receiverError(receiver, errm);
1162 return -1;
1163 }
1164 break;
1165 }
1166 /* cache the element size, and accumulate it into the overall FlowSample size */
1167 elem->length = elemSiz;
1168 siz += elemSiz;
1169 }
1170 return siz;
1171 }
1172
1173 /*_________________----------------------------------__________________
1174 _________________ sfl_receiver_writeCountersSample __________________
1175 -----------------__________________________________------------------
1176 */
1177
sfl_receiver_writeCountersSample(SFLReceiver * receiver,SFL_COUNTERS_SAMPLE_TYPE * cs)1178 int sfl_receiver_writeCountersSample(SFLReceiver *receiver, SFL_COUNTERS_SAMPLE_TYPE *cs)
1179 {
1180 int packedSize;
1181 SFLCounters_sample_element *elem;
1182 uint32_t encodingSize;
1183
1184 if(cs == NULL) return -1;
1185 /* if the sample pkt is full enough so that this sample might put */
1186 /* it over the limit, then we should send it now. */
1187 if((packedSize = computeCountersSampleSize(receiver, cs)) == -1) return -1;
1188
1189 /* check in case this one sample alone is too big for the datagram */
1190 /* in fact - if it is even half as big then we should ditch it. Very */
1191 /* important to avoid overruning the packet buffer. */
1192 if(packedSize > (int)(receiver->sFlowRcvrMaximumDatagramSize / 2)) {
1193 receiverError(receiver, "counters sample too big for datagram");
1194 return -1;
1195 }
1196
1197 if((receiver->sampleCollector.pktlen + packedSize) >= receiver->sFlowRcvrMaximumDatagramSize)
1198 sendSample(receiver);
1199
1200 receiver->sampleCollector.numSamples++;
1201
1202 #ifdef SFL_USE_32BIT_INDEX
1203 putNet32(receiver, SFLCOUNTERS_SAMPLE_EXPANDED);
1204 #else
1205 putNet32(receiver, SFLCOUNTERS_SAMPLE);
1206 #endif
1207
1208 putNet32(receiver, packedSize - 8); /* tag and length not included */
1209 putNet32(receiver, cs->sequence_number);
1210
1211 #ifdef SFL_USE_32BIT_INDEX
1212 putNet32(receiver, cs->ds_class);
1213 putNet32(receiver, cs->ds_index);
1214 #else
1215 putNet32(receiver, cs->source_id);
1216 #endif
1217
1218 putNet32(receiver, cs->num_elements);
1219
1220 for(elem = cs->elements; elem != NULL; elem = elem->nxt) {
1221
1222 putNet32(receiver, elem->tag);
1223 putNet32(receiver, elem->length); /* length cached in computeCountersSampleSize() */
1224
1225 switch(elem->tag) {
1226 case SFLCOUNTERS_HOST_PAR:
1227 putNet32(receiver, elem->counterBlock.host_par.dsClass);
1228 putNet32(receiver, elem->counterBlock.host_par.dsIndex);
1229 break;
1230 case SFLCOUNTERS_HTTP:
1231 putNet32(receiver, elem->counterBlock.http.method_option_count);
1232 putNet32(receiver, elem->counterBlock.http.method_get_count);
1233 putNet32(receiver, elem->counterBlock.http.method_head_count);
1234 putNet32(receiver, elem->counterBlock.http.method_post_count);
1235 putNet32(receiver, elem->counterBlock.http.method_put_count);
1236 putNet32(receiver, elem->counterBlock.http.method_delete_count);
1237 putNet32(receiver, elem->counterBlock.http.method_trace_count);
1238 putNet32(receiver, elem->counterBlock.http.method_connect_count);
1239 putNet32(receiver, elem->counterBlock.http.method_other_count);
1240 putNet32(receiver, elem->counterBlock.http.status_1XX_count);
1241 putNet32(receiver, elem->counterBlock.http.status_2XX_count);
1242 putNet32(receiver, elem->counterBlock.http.status_3XX_count);
1243 putNet32(receiver, elem->counterBlock.http.status_4XX_count);
1244 putNet32(receiver, elem->counterBlock.http.status_5XX_count);
1245 putNet32(receiver, elem->counterBlock.http.status_other_count);
1246 break;
1247 default:
1248 {
1249 char errm[MAX_ERRMSG_LEN];
1250 snprintf(errm, MAX_ERRMSG_LEN, "unexpected counters tag (%ud)", elem->tag);
1251 receiverError(receiver, errm);
1252 return -1;
1253 }
1254 break;
1255 }
1256 }
1257 /* sanity check */
1258 encodingSize = (byte_t *)receiver->sampleCollector.datap
1259 - (byte_t *)receiver->sampleCollector.data
1260 - receiver->sampleCollector.pktlen;
1261 if(encodingSize != (uint32_t)packedSize) {
1262 char errm[MAX_ERRMSG_LEN];
1263 snprintf(errm, MAX_ERRMSG_LEN, "sfl_receiver_writeCountersSample: encoding_size(%ud) != expected_size(%ud)",
1264 encodingSize,
1265 packedSize);
1266 receiverError(receiver, errm);
1267 return -1;
1268 }
1269
1270 /* update the pktlen */
1271 receiver->sampleCollector.pktlen = (byte_t *)receiver->sampleCollector.datap - (byte_t *)receiver->sampleCollector.data;
1272 return packedSize;
1273 }
1274
1275 /*_________________---------------------------------__________________
1276 _________________ sfl_receiver_samplePacketsSent __________________
1277 -----------------_________________________________------------------
1278 */
1279
sfl_receiver_samplePacketsSent(SFLReceiver * receiver)1280 uint32_t sfl_receiver_samplePacketsSent(SFLReceiver *receiver)
1281 {
1282 return receiver->sampleCollector.packetSeqNo;
1283 }
1284
1285 /*_________________---------------------------__________________
1286 _________________ sendSample __________________
1287 -----------------___________________________------------------
1288 */
1289
sendSample(SFLReceiver * receiver)1290 static void sendSample(SFLReceiver *receiver)
1291 {
1292 /* construct and send out the sample, then reset for the next one... */
1293 SFLAgent *agent = receiver->agent;
1294
1295 /* go back and fill in the header */
1296 receiver->sampleCollector.datap = receiver->sampleCollector.data;
1297 putNet32(receiver, SFLDATAGRAM_VERSION5);
1298 putAddress(receiver, &agent->myIP);
1299 putNet32(receiver, agent->subId);
1300 putNet32(receiver, ++receiver->sampleCollector.packetSeqNo);
1301 putNet32(receiver, (uint32_t)((agent->now - agent->bootTime) * 1000));
1302 putNet32(receiver, receiver->sampleCollector.numSamples);
1303
1304 /* send */
1305 if(agent->sendFn) (*agent->sendFn)(agent->magic,
1306 agent,
1307 receiver,
1308 (byte_t *)receiver->sampleCollector.data,
1309 receiver->sampleCollector.pktlen);
1310
1311 /* reset for the next time */
1312 resetSampleCollector(receiver);
1313 }
1314
1315 /*_________________---------------------------__________________
1316 _________________ resetSampleCollector __________________
1317 -----------------___________________________------------------
1318 */
1319
resetSampleCollector(SFLReceiver * receiver)1320 static void resetSampleCollector(SFLReceiver *receiver)
1321 {
1322 receiver->sampleCollector.pktlen = 0;
1323 receiver->sampleCollector.numSamples = 0;
1324
1325 /* clear the buffer completely (ensures that pad bytes will always be zeros - thank you CW) */
1326 memset((byte_t *)receiver->sampleCollector.data, 0, (SFL_SAMPLECOLLECTOR_DATA_QUADS * 4));
1327
1328 /* point the datap to just after the header */
1329 receiver->sampleCollector.datap = (receiver->agent->myIP.type == SFLADDRESSTYPE_IP_V6) ?
1330 (receiver->sampleCollector.data + 10) :
1331 (receiver->sampleCollector.data + 7);
1332
1333 /* start pktlen with the right value */
1334 receiver->sampleCollector.pktlen = (byte_t *)receiver->sampleCollector.datap - (byte_t *)receiver->sampleCollector.data;
1335 }
1336
1337 /*_________________---------------------------__________________
1338 _________________ receiverError __________________
1339 -----------------___________________________------------------
1340 */
1341
receiverError(SFLReceiver * receiver,char * msg)1342 static void receiverError(SFLReceiver *receiver, char *msg)
1343 {
1344 sfl_agent_error(receiver->agent, "receiver", msg);
1345 resetSampleCollector(receiver);
1346 }
1347
1348
1349 /*_________________---------------------------__________________
1350 _________________ exposure __________________
1351 -----------------___________________________------------------
1352 selective exposure of some internal hooks, just for this project
1353 */
1354
sfl_receiver_put32(SFLReceiver * receiver,uint32_t val)1355 void sfl_receiver_put32(SFLReceiver *receiver, uint32_t val) { put32(receiver, val); }
sfl_receiver_putOpaque(SFLReceiver * receiver,char * val,int len)1356 void sfl_receiver_putOpaque(SFLReceiver *receiver, char *val, int len) { putOpaque(receiver, val, len); }
sfl_receiver_resetSampleCollector(SFLReceiver * receiver)1357 void sfl_receiver_resetSampleCollector(SFLReceiver *receiver) { resetSampleCollector(receiver); }
1358