1 /*
2 ** Copyright (C) 2006-2020 by Carnegie Mellon University.
3 **
4 ** @OPENSOURCE_LICENSE_START@
5 ** See license information in ../../LICENSE.txt
6 ** @OPENSOURCE_LICENSE_END@
7 */
8
9 /*
10 * ipfixsource.c
11 *
12 * This file and skipfix.c are tightly coupled, and together they
13 * read IPFIX records and convert them to SiLK flow records.
14 *
15 * This file is primary about setting up and tearing down the data
16 * structures used when processing IPFIX.
17 *
18 * The skipfix.c file primarly handles the conversion, and it is
19 * where the reading functions exist.
20 */
21
22 #include <silk/silk.h>
23
24 RCSIDENT("$SiLK: ipfixsource.c 7af5eab585e4 2020-04-15 15:56:48Z mthomas $");
25
26 #include "ipfixsource.h"
27 #include <silk/redblack.h>
28 #include <silk/skthread.h>
29 #include <silk/skvector.h>
30 #include "infomodel.h"
31
32 #ifdef SKIPFIXSOURCE_TRACE_LEVEL
33 #define TRACEMSG_LEVEL SKIPFIXSOURCE_TRACE_LEVEL
34 #endif
35 #define TRACEMSG(lvl, msg) TRACEMSG_TO_TRACEMSGLVL(lvl, msg)
36 #include <silk/sktracemsg.h>
37
38
39 /*
40 * IMPLEMENTATION NOTES
41 *
42 * Each probe is represented by a single skIPFIXSource_t object.
43 *
44 * For probes that process file-based IPFIX sources, the
45 * skIPFIXSource_t object contains an fBuf_t object. When the caller
46 * invokes skIPFIXSourceGetGeneric(), the next record is read from
47 * the fBuf_t and the record is returned. For consistency with
48 * network processing (described next), the file-based
49 * skIPFIXSource_t has an skIPFIXSourceBase_t object, but that object
50 * does little for file-based sources.
51 *
52 * For probes that process network-based IPFIX sources, the
53 * combination of the following four values must be unique: protocol,
54 * listen-on-port, listen-as-address, accept-from-host. (Note that
55 * an ADDR_ANY value for listen-as-address or accept-from-host
56 * matches all other addresses.)
57 *
58 * Each skIPFIXSource_t references an skIPFIXSourceBase_t object.
59 * Each unique listen-as-address/listen-to-port/protocol triple is
60 * handled by a single fbListener_t object, which is contained in the
61 * skIPFIXSourceBase_t object. When two skIPFIXSource_t's differ
62 * only by their accept-from-host addreses, the skIPFIXSource_t's
63 * reference the same skIPFIXSourceBase_t object. The
64 * skIPFIXSourceBase_t objects contain a reference-count. The
65 * skIPFIXSourceBase_t is destroyed when the last skIPFIXSource_t
66 * referring to it is destroyed.
67 *
68 * An skIPFIXConnection_t represents a connection, which is one of
69 * two things: In the TCP case, a connection is equivalent to a TCP
70 * connection. In the UDP case, a connection is a given set of IPFIX
71 * or NFv9 UDP packets sent from a given address, to a given address,
72 * on a given port, with a given domain ID. The skIPFIXConnection_t
73 * object is ipfixsource's way of mapping to the fbSession_t object
74 * in libfixbuf.
75 *
76 * There can be multiple active connections on a probe---consider a
77 * probe that collects from two machines that load-balance. In the
78 * code, this is represented by having each skIPFIXConnection_t
79 * object point to its skIPFIXSource_t. As described below, the
80 * skIPFIXConnection_t is stored as the context pointer on the
81 * libfixbuf fbCollector_t object.
82 *
83 * When a new TCP connection arrives or if a new UDP connection is
84 * seen and we are using a fixbuf that supports multi-UDP, the
85 * fixbufConnect() callback function first determines whether the
86 * peer is allowed to connect. If the peer is allowed, the function
87 * sets the context pointer for the fbCollector_t object to the a new
88 * skIPFIXConnection_t object which contains statistics information
89 * for the connection and the skIPFIXSource_t object associated with
90 * the connection. These skIPFIXConnection_t objects are destroyed
91 * in the fixbufDisconnect() callback.
92 *
93 * When a new UDP peer sends data to the listener, the actual address
94 * is not known until the underlying recvmesg() call itself, rather
95 * than in an accept()-like call similar to TCP. What this means is
96 * that in this scenario the fixbufConnect() appInit function is not
97 * called until a call to fBufNext() or fBufNextCollectionTemplate()
98 * is called.
99 *
100 * There is a similar fixbufConnectUDP() function to handle UDP
101 * connections when libfixbuf does not support multi-UDP. However,
102 * the fundamental difference is this: TCP connections are associated
103 * with a new fbCollector_t at connection time. Non-multi-UDP
104 * connections are associated with a new fbCollector_t during the
105 * fbListenerAlloc() call.
106 *
107 * FIXBUF API ISSUE: The source objects connected to the
108 * fbCollector_t objects have to be passed to the
109 * fixbufConnect*() calls via global objects---newly created
110 * sources are put into a red-black tree; the call to
111 * fixbufConnect*() attempts to find the value in the red-black tree.
112 * It would have made more sense if fbListenerAlloc() took a
113 * caller-specified context pointer which would get passed to the
114 * fbListenerAppInit_fn() and fbListenerAppFree_fn() functions.
115 *
116 * There is one ipfix_reader() thread per skIPFIXSourceBase_t object.
117 * This thread loops around fbListenerWait() returning fBuf_t
118 * objects. The underlying skIPFIXConnection_t containing the source
119 * information is grabbed from the fBuf_t's collector. The
120 * fBufNext() is used to read the data from the fBuf_t and this data
121 * is associated with the given source by either inserting it into
122 * the source's circular buffer, or by adding the stats information
123 * to the source. Then we loop back determining any new connection
124 * and dealing with the next piece of data until the fBuf_t empties.
125 * We then return to fbListenerWait() to get the next fBuf_t.
126
127 * Since there is one thread per listener, if one source attached to
128 * a listener blocks due to the circular buffer becoming full, all
129 * sources attached to the listener will block as well. Solving this
130 * problem would involve more threads, and moving away from the
131 * fbListenerWait() method of doing things. We could instead have a
132 * separate thread per connection. This would require us to handle
133 * the connections (bind/listen/accept) ourselves, and then create
134 * fBufs from the resulting file descriptors.
135 */
136
137
138 /* LOCAL DEFINES AND TYPEDEFS */
139
140 /*
141 * Name of environment variable that, when set, cause SiLK to
142 * ignore any G_LOG_LEVEL_WARNING messages.
143 */
144 #define SK_ENV_FIXBUF_SUPPRESS_WARNING "SILK_LIBFIXBUF_SUPPRESS_WARNINGS"
145
146 /*
147 * SILK_PROTO_TO_FIXBUF_TRANSPORT(silk_proto, &fb_trans);
148 *
149 * Set the fbTransport_t value in the memory referenced by
150 * 'fb_trans' based on the SiLK protocol value 'silk_proto'.
151 */
152 #define SILK_PROTO_TO_FIXBUF_TRANSPORT(silk_proto, fb_trans) \
153 switch (silk_proto) { \
154 case SKPC_PROTO_SCTP: \
155 *(fb_trans) = FB_SCTP; \
156 break; \
157 case SKPC_PROTO_TCP: \
158 *(fb_trans) = FB_TCP; \
159 break; \
160 case SKPC_PROTO_UDP: \
161 *(fb_trans) = FB_UDP; \
162 break; \
163 default: \
164 skAbortBadCase(silk_proto); \
165 }
166
167 /*
168 * The 'addr_to_source' member of 'skIPFIXSourceBase_t' is a
169 * red-black tree whose data members are 'peeraddr_source_t'
170 * objects. The tree is used when multiple sources listen on the
171 * same port and the accept-from-host addresses are used to choose
172 * the source based on the peer address of the sender.
173 *
174 * The 'addr_to_source' tree uses the peeraddr_compare() comparison
175 * function.
176 */
177 typedef struct peeraddr_source_st {
178 const sk_sockaddr_t *addr;
179 skIPFIXSource_t *source;
180 } peeraddr_source_t;
181
182
183 /* EXPORTED VARIABLE DEFINITIONS */
184
185 /* descriptions are in ipfixsource.h */
186
187 /* do the names of IE 48, 49, 50 follow fixbuf-1.x or 2.x? */
188 uint32_t sampler_flags = 0;
189
190
191 /* LOCAL VARIABLE DEFINITIONS */
192
193 /* Mutex around calls to skiCreateListener. */
194 static pthread_mutex_t create_listener_mutex = PTHREAD_MUTEX_INITIALIZER;
195
196 /* Mutex around listener_to_source_base tree and count. */
197 static pthread_mutex_t global_tree_mutex = PTHREAD_MUTEX_INITIALIZER;
198
199 /* Map from listeners to skIPFIXSourceBase_t objects. Objects in
200 * rbtree are skIPFIXSourceBase_t pointers. */
201 static struct rbtree *listener_to_source_base = NULL;
202
203 /* Number of ipfix sources (both networked and file-based) */
204 static uint32_t source_base_count = 0;
205
206
207 /*
208 * There is a single infomation model.
209 */
210 static fbInfoModel_t *ski_model = NULL;
211
212 /*
213 * When processing files with fixbuf, the session object
214 * (fbSession_t) is owned the reader/write buffer (fBuf_t).
215 *
216 * When doing network processing, the fBuf_t does not own the
217 * session. We use this global vector to maintain those session
218 * pointers so they can be freed at shutdown.
219 */
220 static sk_vector_t *session_list = NULL;
221
222
223
224 /* FUNCTION DEFINITIONS */
225
226 /*
227 * The listener_to_source_base_find() function is used as the
228 * comparison function for the listener_to_source_base red-black
229 * tree. Stores objects of type skIPFIXSourceBase_t, orders by
230 * fbListener_t pointer value.
231 */
232 static int
listener_to_source_base_find(const void * va,const void * vb,const void * ctx)233 listener_to_source_base_find(
234 const void *va,
235 const void *vb,
236 const void *ctx)
237 {
238 const fbListener_t *a = ((const skIPFIXSourceBase_t *)va)->listener;
239 const fbListener_t *b = ((const skIPFIXSourceBase_t *)vb)->listener;
240 SK_UNUSED_PARAM(ctx);
241
242 return ((a < b) ? -1 : (a > b));
243 }
244
245
246 /*
247 * The peeraddr_compare() function is used as the comparison
248 * function for the skIPFIXSourceBase_t's red-black tree,
249 * addr_to_source.
250 *
251 * The tree stores peeraddr_source_t objects, keyed by
252 * sk_sockaddr_t address of the accepted peers.
253 */
254 static int
peeraddr_compare(const void * va,const void * vb,const void * ctx)255 peeraddr_compare(
256 const void *va,
257 const void *vb,
258 const void *ctx)
259 {
260 const sk_sockaddr_t *a = ((const peeraddr_source_t *)va)->addr;
261 const sk_sockaddr_t *b = ((const peeraddr_source_t *)vb)->addr;
262 SK_UNUSED_PARAM(ctx);
263
264 return skSockaddrCompare(a, b, SK_SOCKADDRCOMP_NOPORT);
265 }
266
267
268 /*
269 * The pointer_cmp() function is used compare skIPFIXConnection_t
270 * pointers in the 'connections' red-black tree on skIPFIXSource_t
271 * objects.
272 */
273 static int
pointer_cmp(const void * va,const void * vb,const void * ctx)274 pointer_cmp(
275 const void *va,
276 const void *vb,
277 const void *ctx)
278 {
279 SK_UNUSED_PARAM(ctx);
280 return ((va < vb) ? -1 : (va > vb));
281 }
282
283
284 /*
285 * The free_source() function is used to free an skIPFIXSource_t
286 * object. This only frees the object and its data, it does not
287 * mark up any connected skIPFIXSourceBase_t object in the
288 * process.
289 */
290 static void
free_source(skIPFIXSource_t * source)291 free_source(
292 skIPFIXSource_t *source)
293 {
294 TRACE_ENTRY;
295
296 if (source == NULL) {
297 TRACEMSG(3, ("source was null"));
298 TRACE_RETURN;
299 }
300
301 assert(source->connection_count == 0);
302
303 pthread_mutex_destroy(&source->stats_mutex);
304 if (source->circbuf) {
305 skCircBufDestroy(source->circbuf);
306 }
307 if (source->connections) {
308 rbdestroy(source->connections);
309 }
310 if (source->readbuf) {
311 TRACEMSG(3, ("freeing fbuf"));
312 fBufFree(source->readbuf);
313 }
314 if (source->fileptr.of_fp) {
315 TRACEMSG(3, ("closing file"));
316 skFileptrClose(&source->fileptr, &WARNINGMSG);
317 }
318 if (source->file_conn) {
319 TRACEMSG(3, ("freeing file_conn (%zu bytes)",
320 sizeof(source->file_conn)));
321 free(source->file_conn);
322 }
323
324 free(source);
325 TRACE_RETURN;
326 }
327
328
329 /*
330 * The fixbufConnect() function is passed to fbListenerAlloc() as
331 * its 'appinit' callback (fbListenerAppInit_fn).
332 * This function is called from within the fbListenerWait() call
333 * when a new connection to the listening socket is made. (In
334 * addition, for UDP sources, it is called directly by
335 * fbListenerAlloc() with a NULL peer.)
336 *
337 * Its primary purposes are to accept/reject the connection,
338 * create an skIPFIXConnection_t, and set the the collector's
339 * context to the skIPFIXConnection_t. The skIPFIXConnection_t
340 * remembers the peer information, contains the stats for this
341 * connection, and references the source object.
342 */
343 static gboolean
fixbufConnect(fbListener_t * listener,void ** ctx,int fd,struct sockaddr * peer,size_t peerlen,GError ** err)344 fixbufConnect(
345 fbListener_t *listener,
346 void **ctx,
347 int fd,
348 struct sockaddr *peer,
349 size_t peerlen,
350 GError **err)
351 {
352 fbCollector_t *collector;
353 char addr_buf[2 * SKIPADDR_STRLEN];
354 skIPFIXSourceBase_t target_base;
355 skIPFIXSourceBase_t *base;
356 const peeraddr_source_t *found_peer;
357 peeraddr_source_t target_peer;
358 skIPFIXSource_t *source;
359 skIPFIXConnection_t *conn = NULL;
360 sk_sockaddr_t addr;
361 gboolean retval = 0;
362
363 TRACE_ENTRY;
364 SK_UNUSED_PARAM(fd);
365
366 if (peer == NULL) {
367 /* This function is being called for a UDP listener at init
368 * time. Ignore this. */
369 TRACE_RETURN(1);
370 }
371 if (peerlen > sizeof(addr)) {
372 TRACEMSG(1, (("ipfixsource rejected connection:"
373 " peerlen too large: %" SK_PRIuZ " > %" SK_PRIuZ),
374 peerlen, sizeof(addr)));
375 g_set_error(err, SK_IPFIXSOURCE_DOMAIN, SK_IPFIX_ERROR_CONN,
376 ("peerlen unexpectedly large: %" SK_PRIuZ), peerlen);
377 TRACE_RETURN(0);
378 }
379
380 memcpy(&addr.sa, peer, peerlen);
381 skSockaddrString(addr_buf, sizeof(addr_buf), &addr);
382
383 TRACEMSG(3, (("ipfixsource processing connection from '%s'"), addr_buf));
384
385 /* Find the skIPFIXSourceBase_t object associated with this
386 * listener */
387 target_base.listener = listener;
388 pthread_mutex_lock(&global_tree_mutex);
389 base = ((skIPFIXSourceBase_t *)
390 rbfind(&target_base, listener_to_source_base));
391 pthread_mutex_unlock(&global_tree_mutex);
392 if (base == NULL) {
393 TRACEMSG(1, (("ipfixsource rejected connection from '%s':"
394 " unable to find base given listener"), addr_buf));
395 g_set_error(err, SK_IPFIXSOURCE_DOMAIN, SK_IPFIX_ERROR_CONN,
396 "Unable to find base for listener");
397 TRACE_RETURN(0);
398 }
399
400 conn = (skIPFIXConnection_t*)calloc(1, sizeof(skIPFIXConnection_t));
401 if (conn == NULL) {
402 TRACEMSG(1, (("ipfixsource rejected connection from '%s':"
403 " unable to allocate connection object"), addr_buf));
404 g_set_error(err, SK_IPFIXSOURCE_DOMAIN, SK_IPFIX_ERROR_CONN,
405 "Unable to allocate connection object");
406 TRACE_RETURN(0);
407 }
408
409 pthread_mutex_lock(&base->mutex);
410
411 if (base->any) {
412 /* When there is no accept-from address on the probe, there is
413 * a one-to-one mapping between source and base, and all
414 * connections are permitted. */
415 source = base->any;
416 } else {
417 /* Using the address of the incoming connection, search for
418 * the source object associated with this address. */
419 assert(base->addr_to_source);
420 target_peer.addr = &addr;
421 found_peer = ((const peeraddr_source_t*)
422 rbfind(&target_peer, base->addr_to_source));
423 if (NULL == found_peer) {
424 /* Reject hosts that do not appear in accept-from-host */
425 TRACEMSG(1, (("ipfixsource rejected connection from '%s':"
426 " host prohibited"), addr_buf));
427 g_set_error(err, SK_IPFIXSOURCE_DOMAIN, SK_IPFIX_ERROR_CONN,
428 "Connection prohibited from %s", addr_buf);
429 free(conn);
430 goto END;
431 }
432 source = found_peer->source;
433 }
434
435 if (source->stopped) {
436 TRACEMSG(1, (("ipfixsource rejected connection from '%s':"
437 " source is stopping"), addr_buf));
438 g_set_error(err, SK_IPFIXSOURCE_DOMAIN, SK_IPFIX_ERROR_CONN,
439 "Source is stopping");
440 free(conn);
441 goto END;
442 }
443
444 /* If this is an NetFlowV9/sFLow source, store the
445 * skIPFIXConnection_t in the red-black tree on the source so we
446 * can log about missing NetFlowV9/sFlow packets. */
447 if (source->connections) {
448 skIPFIXConnection_t *found_conn;
449
450 pthread_mutex_lock(&source->stats_mutex);
451 found_conn = ((skIPFIXConnection_t*)
452 rbsearch(conn, source->connections));
453 pthread_mutex_unlock(&source->stats_mutex);
454 if (found_conn != conn) {
455 TRACEMSG(1, (("ipfixsource rejected connection from '%s':"
456 " unable to store connection on source"), addr_buf));
457 g_set_error(err, SK_IPFIXSOURCE_DOMAIN, SK_IPFIX_ERROR_CONN,
458 "Unable to store connection on source");
459 free(conn);
460 goto END;
461 }
462 }
463
464 /* Update the skIPFIXConnection_t with the information necessary
465 * to provide a useful log message at disconnect. This info is
466 * also used to get NetFlowV9/sFlow missed packets. */
467 if (peerlen <= sizeof(conn->peer_addr)) {
468 memcpy(&conn->peer_addr.sa, peer, peerlen);
469 conn->peer_len = peerlen;
470 }
471
472 TRACEMSG(4, ("Creating new conn = %p for source = %p", conn, source));
473
474 /* Set the skIPFIXConnection_t to point to the source, increment
475 * the source's connection_count, and set the context pointer to
476 * the connection. */
477 conn->source = source;
478 ++source->connection_count;
479 retval = 1;
480 *ctx = conn;
481
482 /* Get the domain (also needed for NetFlowV9/sFlow missed pkts).
483 * In the TCP case, the collector does not exist yet, and the
484 * GetCollector call returns false. In the UDP-IPFIX case, the
485 * domain of the collector always returns 0. */
486 if (source->connections
487 && fbListenerGetCollector(listener, &collector, NULL))
488 {
489 conn->ob_domain = fbCollectorGetObservationDomain(collector);
490 INFOMSG("'%s': accepted connection from %s, domain %#06x",
491 source->name, addr_buf, conn->ob_domain);
492 } else {
493 INFOMSG("'%s': accepted connection from %s",
494 source->name, addr_buf);
495 }
496
497 END:
498 pthread_mutex_unlock(&base->mutex);
499 TRACE_RETURN(retval);
500 }
501
502
503 /*
504 * The fixbufDisconnect() function is passed to fbListenerAlloc()
505 * as its 'appfree' callback (fbListenerAppFree_fn). This
506 * function is called by fBufFree(). The argument to this
507 * function is the context (the skIPFIXConnection_t) that was set
508 * by fixbufConnect().
509 *
510 * The function decrefs the source and frees it if the
511 * connection_count hits zero and the source has been asked to be
512 * destroyed. It then frees the connection object.
513 */
514 static void
fixbufDisconnect(void * ctx)515 fixbufDisconnect(
516 void *ctx)
517 {
518 skIPFIXConnection_t *conn = (skIPFIXConnection_t *)ctx;
519
520 TRACE_ENTRY;
521
522 if (conn == NULL) {
523 TRACE_RETURN;
524 }
525
526 TRACEMSG(3, (("fixbufDisconnection connection_count = %" PRIu32),
527 conn->source->connection_count));
528
529 /* Remove the connection from the source. */
530 --conn->source->connection_count;
531 if (conn->source->connections) {
532 pthread_mutex_lock(&conn->source->stats_mutex);
533 rbdelete(conn, conn->source->connections);
534 pthread_mutex_unlock(&conn->source->stats_mutex);
535 }
536
537 /* For older fixbuf, only TCP connections contain the peer addr */
538 if (conn->peer_len) {
539 char addr_buf[2 * SKIPADDR_STRLEN];
540
541 skSockaddrString(addr_buf, sizeof(addr_buf), &conn->peer_addr);
542 if (conn->ob_domain) {
543 INFOMSG("'%s': noticed disconnect by %s, domain %#06x",
544 conn->source->name, addr_buf, conn->ob_domain);
545 } else {
546 INFOMSG("'%s': noticed disconnect by %s",
547 conn->source->name, addr_buf);
548 }
549 }
550
551 TRACEMSG(4, ("Destroying conn = %p for source %p", conn, conn->source));
552
553 /* Destroy it if this is the last reference to the source. */
554 if (conn->source->destroy && conn->source->connection_count == 0) {
555 free_source(conn->source);
556 }
557 free(conn);
558 TRACE_RETURN;
559 }
560
561
562 /*
563 * Return a pointer to the single information model. If necessary
564 * create and initialize it.
565 */
566 fbInfoModel_t *
skiInfoModel(void)567 skiInfoModel(
568 void)
569 {
570 fbInfoModel_t *m = ski_model;
571
572 if (!m) {
573 TRACEMSG(4, ("Allocating an info model"));
574 m = fbInfoModelAlloc();
575 /* call a function in infomodel.c to update the info model
576 * with the info elements defined in the .xml file(s) in the
577 * infomodel subdirectory */
578 infomodelAddGlobalElements(m);
579 ski_model = m;
580 }
581 return ski_model;
582 }
583
584 /*
585 * Free the single information model.
586 */
587 void
skiInfoModelFree(void)588 skiInfoModelFree(
589 void)
590 {
591 fbInfoModel_t *m = ski_model;
592
593 ski_model = NULL;
594 if (m) {
595 TRACEMSG(4, ("Freeing an info model"));
596 fbInfoModelFree(m);
597 }
598 }
599
600
601 /**
602 * Free the memory associated with the Info Model---note that doing
603 * so is not tread safe.
604 */
605 void
skiTeardown(void)606 skiTeardown(
607 void)
608 {
609 size_t i;
610 fbSession_t *session;
611
612 if (session_list) {
613 for (i = 0; i < skVectorGetCount(session_list); i++) {
614 skVectorGetValue(&session, session_list, i);
615 fbSessionFree(session);
616 }
617 skVectorDestroy(session_list);
618 session_list = NULL;
619 }
620
621 skiInfoModelFree();
622 }
623
624
625 /**
626 * Create an IPFIX Collecting Process listener.
627 */
628 static fbListener_t *
skiCreateListener(skIPFIXSourceBase_t * base,GError ** err)629 skiCreateListener(
630 skIPFIXSourceBase_t *base,
631 GError **err)
632 {
633 fbSession_t *session;
634 fbListener_t *listener;
635 int created_vec = 0;
636
637 TRACE_ENTRY;
638
639 ASSERT_MUTEX_LOCKED(&create_listener_mutex);
640
641 /* The session is not owned by the buffer or the listener, so
642 * maintain a vector of them for later destruction. */
643 if (!session_list) {
644 session_list = skVectorNew(sizeof(fbSession_t *));
645 if (session_list == NULL) {
646 TRACE_RETURN(NULL);
647 }
648 created_vec = 1;
649 }
650 /* fixbuf (glib) exits on allocation error */
651 session = fbSessionAlloc(skiInfoModel());
652
653 /* Initialize session for reading */
654 if (!skiSessionInitReader(session, err)) {
655 goto ERROR;
656 }
657 if (skVectorAppendValue(session_list, &session) != 0) {
658 goto ERROR;
659 }
660
661 /* Allocate a listener. 'fixbufConnect' is called on each
662 * collection attempt; vetoes connection attempts and creates
663 * application context. */
664 listener = fbListenerAlloc(base->connspec, session, fixbufConnect,
665 fixbufDisconnect, err);
666 TRACE_RETURN(listener);
667
668 ERROR:
669 fbSessionFree(session);
670 if (created_vec) {
671 skVectorDestroy(session_list);
672 session_list = NULL;
673 }
674 TRACE_RETURN(NULL);
675 }
676
677
678 /**
679 * Create a buffer pointer suitable for use for
680 * ski_fixrec_next(). The file pointer must be opened for reading.
681 */
682 static fBuf_t *
skiCreateReadBufferForFP(void * ctx,FILE * fp,GError ** err)683 skiCreateReadBufferForFP(
684 void *ctx,
685 FILE *fp,
686 GError **err)
687 {
688 fbSession_t *session;
689 fBuf_t *fbuf;
690
691 /* Allocate a session. The session will be owned by the fbuf, so
692 * don't save it for later freeing. */
693 session = fbSessionAlloc(skiInfoModel());
694
695 /* Initialize session for reading */
696 if (!skiSessionInitReader(session, err)) {
697 fbSessionFree(session);
698 return NULL;
699 }
700
701 /* Create a buffer with the session and a collector */
702 fbuf = fBufAllocForCollection(session, fbCollectorAllocFP(ctx, fp));
703
704 /* Make certain the fbuf has an internal template */
705 if (!fBufSetInternalTemplate(fbuf, SKI_YAFSTATS_TID, err)) {
706 fBufFree(fbuf);
707 return NULL;
708 }
709
710 return fbuf;
711 }
712
713
714
715
716 /*
717 * The free_connspec() function frees a fbConnSpec_t object.
718 */
719 static void
free_connspec(fbConnSpec_t * connspec)720 free_connspec(
721 fbConnSpec_t *connspec)
722 {
723 TRACE_ENTRY;
724
725 if (connspec->host) {
726 free(connspec->host);
727 }
728 if (connspec->svc) {
729 free(connspec->svc);
730 }
731 free(connspec);
732
733 TRACE_RETURN;
734 }
735
736
737 /*
738 * The ipfixSourceCreateBase() function allocates a new
739 * skIPFIXSourceBase_t object.
740 */
741 static skIPFIXSourceBase_t *
ipfixSourceCreateBase(void)742 ipfixSourceCreateBase(
743 void)
744 {
745 skIPFIXSourceBase_t *base;
746
747 TRACE_ENTRY;
748
749 base = (skIPFIXSourceBase_t*)calloc(1, sizeof(skIPFIXSourceBase_t));
750 if (base == NULL) {
751 TRACE_RETURN(NULL);
752 }
753
754 pthread_mutex_init(&base->mutex, NULL);
755 pthread_cond_init(&base->cond, NULL);
756
757 TRACE_RETURN(base);
758 }
759
760
761 /*
762 * The ipfixSourceCreateFromFile() function creates a new
763 * skIPFIXSource_t object and associated base object for a
764 * file-based IPFIX stream.
765 */
766 static skIPFIXSource_t *
ipfixSourceCreateFromFile(const skpc_probe_t * probe,const char * path_name)767 ipfixSourceCreateFromFile(
768 const skpc_probe_t *probe,
769 const char *path_name)
770 {
771 skIPFIXSourceBase_t *base = NULL;
772 skIPFIXSource_t *source = NULL;
773 GError *err = NULL;
774 int rv;
775
776 TRACE_ENTRY;
777
778 /* Create the base object */
779 base = ipfixSourceCreateBase();
780 if (base == NULL) {
781 goto ERROR;
782 }
783 pthread_mutex_lock(&global_tree_mutex);
784 ++source_base_count;
785 pthread_mutex_unlock(&global_tree_mutex);
786
787 /* Create the source object */
788 source = (skIPFIXSource_t*)calloc(1, sizeof(*source));
789 if (source == NULL) {
790 goto ERROR;
791 }
792
793 /* Open the file */
794 source->fileptr.of_name = path_name;
795 rv = skFileptrOpen(&source->fileptr, SK_IO_READ);
796 if (rv) {
797 ERRMSG("Unable to open file '%s': %s",
798 path_name, skFileptrStrerror(rv));
799 goto ERROR;
800 }
801 if (SK_FILEPTR_IS_PROCESS == source->fileptr.of_type) {
802 skAppPrintErr("Reading from gzipped files is not supported");
803 goto ERROR;
804 }
805
806 /* Attach the source and base objects */
807 source->base = base;
808 base->any = source;
809 ++base->source_count;
810
811 /* Set the source's name from the probe name */
812 source->probe = probe;
813 source->name = skpcProbeGetName(probe);
814
815 /* Create a connection object that points to the source, and store
816 * it on the source */
817 source->file_conn =
818 (skIPFIXConnection_t *)calloc(1, sizeof(skIPFIXConnection_t));
819 if (NULL == source->file_conn) {
820 goto ERROR;
821 }
822 source->file_conn->source = source;
823
824 /* Create a file-based fBuf_t for the source */
825 source->readbuf = skiCreateReadBufferForFP((void *)source->file_conn,
826 source->fileptr.of_fp, &err);
827 if (source->readbuf == NULL) {
828 if (err) {
829 ERRMSG("%s: %s", "skiCreateReadBufferForFP", err->message);
830 }
831 goto ERROR;
832 }
833
834 pthread_mutex_init(&source->stats_mutex, NULL);
835
836 TRACE_RETURN(source);
837
838 ERROR:
839 g_clear_error(&err);
840 if (source) {
841 if (NULL != source->fileptr.of_fp) {
842 skFileptrClose(&source->fileptr, &WARNINGMSG);
843 }
844 if (source->readbuf) {
845 fBufFree(source->readbuf);
846 }
847 free(source->file_conn);
848 free(source);
849 }
850 if (base) {
851 free(base);
852 pthread_mutex_lock(&global_tree_mutex);
853 --source_base_count;
854 if (0 == source_base_count) {
855 skiInfoModelFree();
856 }
857 pthread_mutex_unlock(&global_tree_mutex);
858 }
859 TRACE_RETURN(NULL);
860 }
861
862
863 /*
864 * Add the 'source' object to the 'base' object (or for an
865 * alternate view, have the 'source' wrap the 'base'). Return 0 on
866 * success, or -1 on failure.
867 */
868 static int
ipfixSourceBaseAddIPFIXSource(skIPFIXSourceBase_t * base,skIPFIXSource_t * source)869 ipfixSourceBaseAddIPFIXSource(
870 skIPFIXSourceBase_t *base,
871 skIPFIXSource_t *source)
872 {
873 const sk_sockaddr_array_t **accept_from;
874 peeraddr_source_t *peeraddr;
875 const peeraddr_source_t *found;
876 fbTransport_t transport;
877 uint32_t accept_from_count;
878 uint32_t i;
879 uint32_t j;
880 int rv = -1;
881
882 TRACE_ENTRY;
883
884 assert(base);
885 assert(source);
886 assert(source->probe);
887 assert(NULL == source->base);
888
889 accept_from_count = skpcProbeGetAcceptFromHost(source->probe,&accept_from);
890
891 /* Lock the base */
892 pthread_mutex_lock(&base->mutex);
893
894 /* Base must not be configured to accept packets from any host. */
895 if (base->any) {
896 goto END;
897 }
898 if (NULL == accept_from || 0 == accept_from_count) {
899 /* When no accept-from-host is specified, this source accepts
900 * packets from any address and there should be a one-to-one
901 * mapping between source and base */
902 if (base->addr_to_source) {
903 /* The base already references another source. */
904 goto END;
905 }
906 base->any = source;
907 source->base = base;
908 ++base->source_count;
909 } else {
910 /* Make sure the sources's protocol match the base's protocol */
911 SILK_PROTO_TO_FIXBUF_TRANSPORT(
912 skpcProbeGetProtocol(source->probe), &transport);
913 if (base->connspec->transport != transport) {
914 goto END;
915 }
916
917 /* Connect the base to the source */
918 source->base = base;
919
920 if (NULL == base->addr_to_source) {
921 base->addr_to_source = rbinit(peeraddr_compare, NULL);
922 if (base->addr_to_source == NULL) {
923 goto END;
924 }
925 }
926
927 /* Add a mapping on the base for each accept-from-host address
928 * on this source. */
929 for (j = 0; j < accept_from_count; ++j) {
930 for (i = 0; i < skSockaddrArrayGetSize(accept_from[j]); ++i) {
931 peeraddr = ((peeraddr_source_t*)
932 calloc(1, sizeof(peeraddr_source_t)));
933 if (peeraddr == NULL) {
934 goto END;
935 }
936 peeraddr->source = source;
937 peeraddr->addr = skSockaddrArrayGet(accept_from[j], i);
938 found = ((const peeraddr_source_t*)
939 rbsearch(peeraddr, base->addr_to_source));
940 if (found != peeraddr) {
941 if (found && (found->source == peeraddr->source)) {
942 /* Duplicate address, same connection */
943 free(peeraddr);
944 continue;
945 }
946 /* Memory error adding to tree */
947 free(peeraddr);
948 goto END;
949 }
950 }
951 }
952
953 ++base->source_count;
954 }
955
956 rv = 0;
957
958 END:
959 pthread_mutex_unlock(&base->mutex);
960 TRACE_RETURN(rv);
961 }
962
963
964 void
ipfixSourceBaseFreeListener(skIPFIXSourceBase_t * base)965 ipfixSourceBaseFreeListener(
966 skIPFIXSourceBase_t *base)
967 {
968 ASSERT_MUTEX_LOCKED(&base->mutex);
969
970 /* Remove this base object from the listener_to_source_base
971 * red-black tree */
972 pthread_mutex_lock(&global_tree_mutex);
973 rbdelete(base, listener_to_source_base);
974 pthread_mutex_unlock(&global_tree_mutex);
975
976 TRACEMSG(3, ("base %p calling fbListenerFree", base));
977
978 /* Destroy the fbListener_t object. This destroys the fbuf if the
979 * stream is UDP. */
980 fbListenerFree(base->listener);
981 base->listener = NULL;
982 }
983
984
985 /*
986 * Adds the skIPFIXSourceBase_t object 'base' to the global
987 * red-black tree of base objects, creating the tree if it does not
988 * exist. Returns 0 on success and -1 on failure.
989 */
990 static int
ipfixSourceBaseAddToGlobalList(skIPFIXSourceBase_t * base)991 ipfixSourceBaseAddToGlobalList(
992 skIPFIXSourceBase_t *base)
993 {
994 const void *rv;
995
996 pthread_mutex_lock(&global_tree_mutex);
997
998 if (listener_to_source_base == NULL) {
999 listener_to_source_base = rbinit(listener_to_source_base_find, NULL);
1000 if (listener_to_source_base == NULL) {
1001 pthread_mutex_unlock(&global_tree_mutex);
1002 return -1;
1003 }
1004 }
1005
1006 rv = rbsearch(base, listener_to_source_base);
1007 pthread_mutex_unlock(&global_tree_mutex);
1008
1009 if (base != rv) {
1010 if (NULL == rv) {
1011 CRITMSG("Out of memory");
1012 } else {
1013 CRITMSG("Duplicate listener created");
1014 }
1015 return -1;
1016 }
1017 return 0;
1018 }
1019
1020
1021 #if 0
1022 /*
1023 * The following is #if 0'ed out because it fails to do what it
1024 * is intended to do.
1025 *
1026 * The issue appears to be that fixbuf and SiLK use different
1027 * flags to getaddrinfo(), which changes the set of addresses
1028 * that are returned.
1029 */
1030 /*
1031 * fixbuf does not return an error when it cannot bind to any
1032 * listening address, which means the application can start
1033 * correctly but not be actively listening. The following code
1034 * attempts to detect this situation before creating the fixbuf
1035 * listener by binding to the port.
1036 *
1037 * Return 0 when able to successfully bind to the address or -1
1038 * otherwise.
1039 */
1040 static int
1041 ipfixSourceBaseVerifyOpenPort(
1042 const sk_sockaddr_array_t *listen_address)
1043 {
1044 const sk_sockaddr_t *addr;
1045 char addr_name[PATH_MAX];
1046 int *sock_array;
1047 int *s;
1048 uint16_t port = 0;
1049
1050 s = sock_array = (int *)calloc(skSockaddrArrayGetSize(listen_address),
1051 sizeof(int));
1052 if (sock_array == NULL) {
1053 return -1;
1054 }
1055
1056 DEBUGMSG(("Attempting to bind %" PRIu32 " addresses for %s"),
1057 skSockaddrArrayGetSize(listen_address),
1058 skSockaddrArrayGetHostPortPair(listen_address));
1059 for (i = 0; i < skSockaddrArrayGetSize(listen_address); ++i) {
1060 addr = skSockaddrArrayGet(listen_address, i);
1061 skSockaddrString(addr_name, sizeof(addr_name), addr);
1062
1063 /* Get a socket */
1064 *s = socket(addr->sa.sa_family, SOCK_DGRAM, 0);
1065 if (-1 == *s) {
1066 DEBUGMSG("Skipping %s: Unable to create dgram socket: %s",
1067 addr_name, strerror(errno));
1068 continue;
1069 }
1070 /* Bind socket to port */
1071 if (bind(*s, &addr->sa, skSockaddrLen(addr)) == -1) {
1072 DEBUGMSG("Skipping %s: Unable to bind: %s",
1073 addr_name, strerror(errno));
1074 close(*s);
1075 *s = -1;
1076 continue;
1077 }
1078 DEBUGMSG("Bound %s for listening", addr_name);
1079 ++s;
1080 if (0 == port) {
1081 port = skSockaddrGetPort(addr);
1082 }
1083 assert(port == skSockaddrGetPort(addr));
1084 }
1085
1086 if (s == sock_array) {
1087 ERRMSG("Failed to bind any addresses for %s",
1088 skSockaddrArrayGetHostPortPair(listen_address));
1089 free(sock_array);
1090 return -1;
1091 }
1092 DEBUGMSG(("Bound %" PRIu32 "/%" PRIu32 " addresses for %s"),
1093 (uint32_t)(s-sock_array),
1094 skSockaddrArrayGetSize(listen_address),
1095 skSockaddrArrayGetHostPortPair(listen_address));
1096 while (s != sock_array) {
1097 --s;
1098 close(*s);
1099 }
1100 free(sock_array);
1101 return 0;
1102 }
1103 #endif /* 0 */
1104
1105
1106 /*
1107 * Creates a IPFIX source listening on the network.
1108 *
1109 * 'probe' is the probe associated with the source. 'max_flows' is
1110 * the number of IPFIX flows the created source can buffer in
1111 * memory.
1112 *
1113 * Returns a IPFIX source on success, or NULL on failure.
1114 */
1115 static skIPFIXSource_t *
ipfixSourceCreateFromSockaddr(const skpc_probe_t * probe,uint32_t max_flows)1116 ipfixSourceCreateFromSockaddr(
1117 const skpc_probe_t *probe,
1118 uint32_t max_flows)
1119 {
1120 skIPFIXSource_t *source = NULL;
1121 skIPFIXSourceBase_t *localbase = NULL;
1122 skIPFIXSourceBase_t *base;
1123 const sk_sockaddr_array_t *listen_address;
1124 const sk_sockaddr_array_t **accept_from;
1125 peeraddr_source_t target;
1126 const peeraddr_source_t *found;
1127 skpc_proto_t protocol;
1128 GError *err = NULL;
1129 char port_string[7];
1130 uint32_t accept_from_count;
1131 uint32_t i;
1132 uint32_t j;
1133 int rv;
1134
1135 TRACE_ENTRY;
1136
1137 /* Check the protocol */
1138 protocol = skpcProbeGetProtocol(probe);
1139
1140 /* Get the list of accept-from-host addresses. */
1141 accept_from_count = skpcProbeGetAcceptFromHost(probe, &accept_from);
1142
1143 /* Get the listen address. */
1144 rv = skpcProbeGetListenOnSockaddr(probe, &listen_address);
1145 if (rv == -1) {
1146 goto ERROR;
1147 }
1148
1149 /* Check to see if there is an existing base object for that
1150 * listen address */
1151 pthread_mutex_lock(&global_tree_mutex);
1152 if (!listener_to_source_base) {
1153 base = NULL;
1154 } else {
1155 /* Loop through all current bases, and compare based on
1156 * listen_address and protocol */
1157 fbTransport_t transport;
1158 RBLIST *iter;
1159
1160 SILK_PROTO_TO_FIXBUF_TRANSPORT(protocol, &transport);
1161 iter = rbopenlist(listener_to_source_base);
1162 while ((base = (skIPFIXSourceBase_t *)rbreadlist(iter)) != NULL) {
1163 if (transport == base->connspec->transport
1164 && skSockaddrArrayMatches(base->listen_address,
1165 listen_address, 0))
1166 {
1167 /* Found a match. 'base' is now set to the matching
1168 * base */
1169 break;
1170 }
1171 }
1172 rbcloselist(iter);
1173 }
1174 pthread_mutex_unlock(&global_tree_mutex);
1175
1176 #if 0
1177 if (NULL == base) {
1178 if (ipfixSourceBaseVerifyOpenPort(listen_address)) {
1179 goto ERROR;
1180 }
1181 }
1182 #endif /* 0 */
1183
1184 /* if there is an existing base on this listen-address, compare
1185 * its accept-from settings with those on this probe */
1186 if (base) {
1187 if (accept_from == NULL) {
1188 /* The new listener wants to be promiscuous but another
1189 * listener already exists. */
1190 goto ERROR;
1191 }
1192 pthread_mutex_lock(&base->mutex);
1193 if (base->any) {
1194 /* Already have a listener, and it is promiscuous. */
1195 pthread_mutex_unlock(&base->mutex);
1196 goto ERROR;
1197 }
1198 /* Ensure the accept-from addresses are unique. */
1199 for (j = 0; j < accept_from_count; ++j) {
1200 for (i = 0; i < skSockaddrArrayGetSize(accept_from[j]); ++i) {
1201 target.addr = skSockaddrArrayGet(accept_from[j], i);
1202 found = ((const peeraddr_source_t*)
1203 rbfind(&target, base->addr_to_source));
1204 if (found != NULL) {
1205 pthread_mutex_unlock(&base->mutex);
1206 goto ERROR;
1207 }
1208 }
1209 }
1210 pthread_mutex_unlock(&base->mutex);
1211 }
1212
1213 /* Create a new source object */
1214 source = (skIPFIXSource_t *)calloc(1, sizeof(*source));
1215 if (source == NULL) {
1216 goto ERROR;
1217 }
1218
1219 /* Keep a handle to the probe and the probe's name */
1220 source->probe = probe;
1221 source->name = skpcProbeGetName(probe);
1222
1223 if (PROBE_ENUM_NETFLOW_V9 == skpcProbeGetType(probe)
1224 || PROBE_ENUM_SFLOW == skpcProbeGetType(probe))
1225 {
1226 /* Create the look-up table for skIPFIXConnection_t's */
1227 source->connections = rbinit(pointer_cmp, NULL);
1228 if (NULL == source->connections) {
1229 goto ERROR;
1230 }
1231 }
1232
1233 /* Create the circular buffer */
1234 if (skCircBufCreate(&source->circbuf, sizeof(rwRec), max_flows)) {
1235 goto ERROR;
1236 }
1237 /* Ready the first location in the circular buffer for writing */
1238 if (skCircBufGetWriterBlock(
1239 source->circbuf, &source->current_record, NULL))
1240 {
1241 skAbort();
1242 }
1243
1244 pthread_mutex_init(&source->stats_mutex, NULL);
1245
1246 if (base != NULL) {
1247 /* If there is an existing base, add the source to it. */
1248 if (ipfixSourceBaseAddIPFIXSource(base, source)) {
1249 goto ERROR;
1250 }
1251 } else {
1252 /* No existing base, create a new one */
1253
1254 /* Create the base object */
1255 base = localbase = ipfixSourceCreateBase();
1256 if (base == NULL) {
1257 goto ERROR;
1258 }
1259 pthread_mutex_lock(&global_tree_mutex);
1260 ++source_base_count;
1261 pthread_mutex_unlock(&global_tree_mutex);
1262
1263 /* Set the listen_address */
1264 base->listen_address = listen_address;
1265
1266 /* Create a connspec in order to create a listener */
1267 base->connspec = (fbConnSpec_t *)calloc(1, sizeof(*base->connspec));
1268 if (base->connspec == NULL) {
1269 goto ERROR;
1270 }
1271 if (skSockaddrArrayGetHostname(listen_address)
1272 != sk_sockaddr_array_anyhostname)
1273 {
1274 base->connspec->host
1275 = strdup(skSockaddrArrayGetHostname(listen_address));
1276 if (base->connspec->host == NULL) {
1277 goto ERROR;
1278 }
1279 }
1280 rv = snprintf(port_string, sizeof(port_string), "%i",
1281 skSockaddrGetPort(skSockaddrArrayGet(listen_address, 0)));
1282 assert((size_t)rv < sizeof(port_string));
1283 base->connspec->svc = strdup(port_string);
1284 if (base->connspec->svc == NULL) {
1285 goto ERROR;
1286 }
1287 SILK_PROTO_TO_FIXBUF_TRANSPORT(protocol, &base->connspec->transport);
1288
1289 /* Create the listener */
1290 pthread_mutex_lock(&create_listener_mutex);
1291 base->listener = skiCreateListener(base, &err);
1292 if (NULL == base->listener) {
1293 pthread_mutex_unlock(&create_listener_mutex);
1294 goto ERROR;
1295 }
1296 if (SKPC_PROTO_UDP == protocol) {
1297 fbCollector_t *collector;
1298
1299 if (!fbListenerGetCollector(base->listener, &collector, &err)) {
1300 pthread_mutex_unlock(&create_listener_mutex);
1301 goto ERROR;
1302 }
1303 /* Enable the multi-UDP support in libfixbuf. */
1304 fbCollectorSetUDPMultiSession(collector, 1);
1305
1306 #if !FIXBUF_CHECK_VERSION(2,0,0)
1307 /* Treat UDP streams from the same address but different
1308 * ports as different streams, in accordance with the
1309 * IPFIX/NetFlow v9 RFCs. */
1310 fbCollectorManageUDPStreamByPort(collector, TRUE);
1311 #endif /* FIXBUF_CHECK_VERSION */
1312
1313 /* If this is a Netflow v9 source or an sFlow source, tell
1314 * the collector. */
1315 switch (skpcProbeGetType(source->probe)) {
1316 case PROBE_ENUM_IPFIX:
1317 break;
1318 case PROBE_ENUM_NETFLOW_V9:
1319 if (!fbCollectorSetNetflowV9Translator(collector, &err)) {
1320 pthread_mutex_unlock(&create_listener_mutex);
1321 goto ERROR;
1322 }
1323 break;
1324 case PROBE_ENUM_SFLOW:
1325 if (!fbCollectorSetSFlowTranslator(collector, &err)) {
1326 pthread_mutex_unlock(&create_listener_mutex);
1327 goto ERROR;
1328 }
1329 break;
1330 default:
1331 skAbortBadCase(skpcProbeGetType(source->probe));
1332 }
1333 }
1334 pthread_mutex_unlock(&create_listener_mutex);
1335
1336 pthread_mutex_init(&base->mutex, NULL);
1337 pthread_cond_init(&base->cond, NULL);
1338
1339 /* add the source to the base */
1340 if (ipfixSourceBaseAddIPFIXSource(base, source)) {
1341 goto ERROR;
1342 }
1343
1344 /* Add base to list of bases, creating the list if needed */
1345 if (ipfixSourceBaseAddToGlobalList(base)) {
1346 goto ERROR;
1347 }
1348
1349 /* Start the listener thread */
1350 pthread_mutex_lock(&base->mutex);
1351 rv = skthread_create(skSockaddrArrayGetHostPortPair(listen_address),
1352 &base->thread, ipfix_reader, (void*)base);
1353 if (rv != 0) {
1354 pthread_mutex_unlock(&base->mutex);
1355 WARNINGMSG("Unable to spawn new thread for '%s': %s",
1356 skSockaddrArrayGetHostPortPair(listen_address),
1357 strerror(rv));
1358 goto ERROR;
1359 }
1360
1361 /* Wait for the thread to really begin */
1362 do {
1363 pthread_cond_wait(&base->cond, &base->mutex);
1364 } while (!base->started);
1365 pthread_mutex_unlock(&base->mutex);
1366 }
1367
1368 TRACE_RETURN(source);
1369
1370 ERROR:
1371 if (err) {
1372 ERRMSG("'%s': %s", source->name, err->message);
1373 }
1374 g_clear_error(&err);
1375 if (localbase) {
1376 if (localbase->listener) {
1377 fbListenerFree(localbase->listener);
1378 }
1379 if (localbase->connspec) {
1380 free_connspec(localbase->connspec);
1381 }
1382 if (localbase->addr_to_source) {
1383 rbdestroy(localbase->addr_to_source);
1384 }
1385 free(localbase);
1386 pthread_mutex_lock(&global_tree_mutex);
1387 --source_base_count;
1388 if (0 == source_base_count) {
1389 skiInfoModelFree();
1390 if (listener_to_source_base) {
1391 rbdestroy(listener_to_source_base);
1392 listener_to_source_base = NULL;
1393 }
1394 }
1395 pthread_mutex_unlock(&global_tree_mutex);
1396 }
1397 if (source) {
1398 if (source->circbuf) {
1399 skCircBufDestroy(source->circbuf);
1400 }
1401 if (source->connections) {
1402 rbdestroy(source->connections);
1403 }
1404 free(source);
1405 }
1406 TRACE_RETURN(NULL);
1407 }
1408
1409
1410 /*
1411 * Handler to print log messages. This will be invoked by g_log()
1412 * and the other logging functions from GLib2.
1413 */
1414 static void
ipfixGLogHandler(const gchar * log_domain,GLogLevelFlags log_level,const gchar * message,gpointer user_data)1415 ipfixGLogHandler(
1416 const gchar *log_domain,
1417 GLogLevelFlags log_level,
1418 const gchar *message,
1419 gpointer user_data)
1420 {
1421 /* In syslog, CRIT is worse than ERR; in Glib2 ERROR is worse than
1422 * CRITICAL. */
1423 SK_UNUSED_PARAM(log_domain);
1424 SK_UNUSED_PARAM(user_data);
1425
1426 switch (log_level & G_LOG_LEVEL_MASK) {
1427 case G_LOG_LEVEL_CRITICAL:
1428 ERRMSG("%s", message);
1429 break;
1430 case G_LOG_LEVEL_WARNING:
1431 WARNINGMSG("%s", message);
1432 break;
1433 case G_LOG_LEVEL_MESSAGE:
1434 NOTICEMSG("%s", message);
1435 break;
1436 case G_LOG_LEVEL_INFO:
1437 INFOMSG("%s", message);
1438 break;
1439 case G_LOG_LEVEL_DEBUG:
1440 DEBUGMSG("%s", message);
1441 break;
1442 default:
1443 CRITMSG("%s", message);
1444 break;
1445 }
1446 }
1447
1448 /*
1449 * GLib Log handler to discard messages.
1450 */
1451 static void
ipfixGLogHandlerVoid(const gchar * log_domain,GLogLevelFlags log_level,const gchar * message,gpointer user_data)1452 ipfixGLogHandlerVoid(
1453 const gchar *log_domain,
1454 GLogLevelFlags log_level,
1455 const gchar *message,
1456 gpointer user_data)
1457 {
1458 SK_UNUSED_PARAM(*log_domain);
1459 SK_UNUSED_PARAM(log_level);
1460 SK_UNUSED_PARAM(*message);
1461 SK_UNUSED_PARAM(user_data);
1462 return;
1463 }
1464
1465
1466 /*
1467 * ipfixSourceGlibInitialize();
1468 *
1469 * Initialize the GLib slice allocator. Since there is no way to
1470 * de-initialize the slice allocator, valgrind will report this
1471 * memory as "still-reachable". We would rather have this
1472 * "still-reachable" memory reported in a well-known location,
1473 * instead of hidden somewhere within fixbuf.
1474 */
1475 static void
ipfixSourceGlibInitialize(void)1476 ipfixSourceGlibInitialize(
1477 void)
1478 {
1479 #if (GLIB_MAJOR_VERSION == 2 && GLIB_MINOR_VERSION >= 10)
1480 #define MEMORY_SIZE 128
1481 gpointer memory;
1482
1483 memory = g_slice_alloc(MEMORY_SIZE);
1484 g_slice_free1(MEMORY_SIZE, memory);
1485 #endif
1486 }
1487
1488
1489 /*
1490 * Performs any initialization required prior to creating the IPFIX
1491 * sources. Returns 0 on success, or -1 on failure.
1492 */
1493 int
skIPFIXSourcesSetup(void)1494 skIPFIXSourcesSetup(
1495 void)
1496 {
1497 const char *env;
1498 GLogLevelFlags log_levels = (GLogLevelFlags)(G_LOG_LEVEL_CRITICAL
1499 | G_LOG_LEVEL_WARNING
1500 | G_LOG_LEVEL_MESSAGE
1501 | G_LOG_LEVEL_INFO
1502 | G_LOG_LEVEL_DEBUG);
1503
1504 /* initialize the slice allocator */
1505 ipfixSourceGlibInitialize();
1506
1507 /* As of glib 2.32, g_thread_init() is deprecated. */
1508 #if (GLIB_MAJOR_VERSION == 2 && GLIB_MINOR_VERSION < 32)
1509 /* tell fixbuf (glib) we are a threaded program. this will abort
1510 * if glib does not have thread support. */
1511 if (!g_thread_supported()) {
1512 g_thread_init(NULL);
1513 }
1514 #endif
1515
1516 /* set a log handler for messages from glib, which we always want
1517 * to include in our log file.
1518 * http://developer.gnome.org/glib/stable/glib-Message-Logging.html */
1519 g_log_set_handler("GLib", log_levels, &ipfixGLogHandler, NULL);
1520
1521 /* set a log handler for messages from fixbuf, maybe using a void
1522 * handler for warnings. */
1523 env = getenv(SK_ENV_FIXBUF_SUPPRESS_WARNING);
1524 if (env && *env && 0 == strcmp("1", env)) {
1525 /* suppress warnings by setting a void handler */
1526 log_levels = (GLogLevelFlags)((unsigned int)log_levels
1527 & ~(unsigned int)G_LOG_LEVEL_WARNING);
1528 g_log_set_handler(
1529 NULL, G_LOG_LEVEL_WARNING, &ipfixGLogHandlerVoid, NULL);
1530 }
1531 g_log_set_handler(NULL, log_levels, &ipfixGLogHandler, NULL);
1532
1533 /* Determine which information elements should be used when
1534 * defining the NetFlow v9 Sampling template. */
1535 ski_nf9sampling_check_spec();
1536
1537 return 0;
1538 }
1539
1540
1541 /*
1542 * Free any state allocated by skIPFIXSourcesSetup().
1543 */
1544 void
skIPFIXSourcesTeardown(void)1545 skIPFIXSourcesTeardown(
1546 void)
1547 {
1548 skiTeardown();
1549 }
1550
1551
1552 /*
1553 * Creates a IPFIX source based on an skpc_probe_t.
1554 *
1555 * If the source is a network-based probe, this function also
1556 * starts the collection process.
1557 *
1558 * When creating a source from a network-based probe, the 'params'
1559 * union should have the 'max_pkts' member specify the maximum
1560 * number of packets to buffer in memory for this source.
1561 *
1562 * When creating a source from a probe that specifies either a file
1563 * or a directory that is polled for files, the 'params' union must
1564 * have the 'path_name' specify the full path of the file to
1565 * process.
1566 *
1567 * Return the new source, or NULL on error.
1568 */
1569 skIPFIXSource_t *
skIPFIXSourceCreate(const skpc_probe_t * probe,const skFlowSourceParams_t * params)1570 skIPFIXSourceCreate(
1571 const skpc_probe_t *probe,
1572 const skFlowSourceParams_t *params)
1573 {
1574 skIPFIXSource_t *source;
1575
1576 TRACE_ENTRY;
1577
1578 /* Check whether this is a file-based probe---either handles a
1579 * single file or files pulled from a directory poll */
1580 if (NULL != skpcProbeGetPollDirectory(probe)
1581 || NULL != skpcProbeGetFileSource(probe))
1582 {
1583 if (NULL == params->path_name) {
1584 TRACE_RETURN(NULL);
1585 }
1586 source = ipfixSourceCreateFromFile(probe, params->path_name);
1587
1588 } else {
1589 /* must be a network-based source */
1590 source = ipfixSourceCreateFromSockaddr(probe, params->max_pkts);
1591 }
1592
1593 TRACE_RETURN(source);
1594 }
1595
1596
1597 /*
1598 * Stops processing of packets. This will cause a call to any
1599 * skIPFIXSourceGetGeneric() function to stop blocking. Meant to
1600 * be used as a prelude to skIPFIXSourceDestroy() in threaded code.
1601 */
1602 void
skIPFIXSourceStop(skIPFIXSource_t * source)1603 skIPFIXSourceStop(
1604 skIPFIXSource_t *source)
1605 {
1606 TRACE_ENTRY;
1607
1608 assert(source);
1609
1610 /* Mark the source as stopped, and unblock the circular buffer */
1611 source->stopped = 1;
1612 if (source->circbuf) {
1613 skCircBufStop(source->circbuf);
1614 }
1615 TRACE_RETURN;
1616 }
1617
1618
1619 /*
1620 * Destroys an IPFIX source.
1621 */
1622 void
skIPFIXSourceDestroy(skIPFIXSource_t * source)1623 skIPFIXSourceDestroy(
1624 skIPFIXSource_t *source)
1625 {
1626 skIPFIXSourceBase_t *base;
1627 const sk_sockaddr_array_t **accept_from;
1628 peeraddr_source_t target;
1629 const peeraddr_source_t *found;
1630 uint32_t accept_from_count;
1631 uint32_t i;
1632 uint32_t j;
1633
1634 TRACE_ENTRY;
1635
1636 if (!source) {
1637 TRACE_RETURN;
1638 }
1639
1640 accept_from_count = skpcProbeGetAcceptFromHost(source->probe,&accept_from);
1641
1642 assert(source->base);
1643
1644 base = source->base;
1645
1646 pthread_mutex_lock(&base->mutex);
1647
1648 /* Remove the source from the red-black tree */
1649 if (base->addr_to_source && accept_from) {
1650 /* Remove the source's accept-from-host addresses from
1651 * base->addr_to_source */
1652 for (j = 0; j < accept_from_count; ++j) {
1653 for (i = 0; i < skSockaddrArrayGetSize(accept_from[j]); ++i) {
1654 target.addr = skSockaddrArrayGet(accept_from[j], i);
1655 found = ((const peeraddr_source_t*)
1656 rbdelete(&target, base->addr_to_source));
1657 if (found && (found->source == source)) {
1658 free((void*)found);
1659 }
1660 }
1661 }
1662 }
1663
1664 /* Stop the source */
1665 skIPFIXSourceStop(source);
1666
1667 /* If the source is not currently being referenced by an fBuf_t,
1668 * free it, otherwise mark it to be destroyed when the fBuf_t is
1669 * freed by fixbufDisconnect(). */
1670 if (source->connection_count == 0) {
1671 free_source(source);
1672 } else {
1673 source->destroy = 1;
1674 }
1675
1676 /* Decrement the source reference count */
1677 assert(base->source_count);
1678 --base->source_count;
1679
1680 TRACEMSG(3, ("base %p source_count is %u", base, base->source_count));
1681
1682 /* If this base object is still referenced by sources, return */
1683 if (base->source_count != 0) {
1684 pthread_mutex_unlock(&base->mutex);
1685 TRACE_RETURN;
1686 }
1687
1688 /* Otherwise, we must destroy the base stop its thread */
1689 base->destroyed = 1;
1690
1691 if (base->listener) {
1692 TRACEMSG(3, ("base %p calling fbListenerInterrupt", base));
1693
1694 /* Unblock the fbListenerWait() call */
1695 fbListenerInterrupt(base->listener);
1696
1697 /* Signal that the thread is to exit */
1698 pthread_cond_broadcast(&base->cond);
1699
1700 TRACEMSG(3, ("base %p waiting for running variable", base));
1701
1702 /* Wait for the thread to exit */
1703 while (base->running) {
1704 pthread_cond_wait(&base->cond, &base->mutex);
1705 }
1706
1707 TRACEMSG(3, ("base %p joining its thread", base));
1708
1709 /* Acknowledge that the thread has exited */
1710 pthread_join(base->thread, NULL);
1711
1712 assert(base->listener == NULL);
1713
1714 /* Free the connspec */
1715 free_connspec(base->connspec);
1716
1717 /* Destroy the red-black tree */
1718 if (base->addr_to_source) {
1719 rbdestroy(base->addr_to_source);
1720 }
1721
1722 pthread_cond_destroy(&base->cond);
1723
1724 pthread_mutex_unlock(&base->mutex);
1725 pthread_mutex_destroy(&base->mutex);
1726 }
1727
1728 TRACEMSG(3, ("base %p is free", base));
1729
1730 free(base);
1731
1732 pthread_mutex_lock(&global_tree_mutex);
1733 --source_base_count;
1734 if (0 == source_base_count) {
1735 /* When the last base is removed, destroy the global base
1736 * list, and call the teardown function for the libskipfix
1737 * library to free any global objects allocated there. */
1738 if (listener_to_source_base) {
1739 rbdestroy(listener_to_source_base);
1740 listener_to_source_base = NULL;
1741 }
1742 skiTeardown();
1743 }
1744 pthread_mutex_unlock(&global_tree_mutex);
1745 TRACE_RETURN;
1746 }
1747
1748
1749
1750
1751 /*
1752 * Requests a SiLK Flow record from the IPFIX source 'source'.
1753 *
1754 * This function will block if there are no IPFIX flows available
1755 * from which to create a SiLK Flow record.
1756 *
1757 * Returns 0 on success, -1 on failure.
1758 */
1759 int
skIPFIXSourceGetGeneric(skIPFIXSource_t * source,rwRec * rwrec)1760 skIPFIXSourceGetGeneric(
1761 skIPFIXSource_t *source,
1762 rwRec *rwrec)
1763 {
1764 rwRec *rec;
1765 int rv;
1766
1767 TRACE_ENTRY;
1768
1769 assert(source);
1770 assert(rwrec);
1771
1772 if (source->circbuf) {
1773 /* Reading from the circular buffer */
1774 if (skCircBufGetReaderBlock(source->circbuf, &rec, NULL)) {
1775 TRACE_RETURN(-1);
1776 }
1777 RWREC_COPY(rwrec, rec);
1778 TRACE_RETURN(0);
1779 }
1780
1781 rv = ipfixSourceGetRecordFromFile(source, rwrec);
1782 TRACE_RETURN(rv);
1783 }
1784
1785
1786
1787 /* Log statistics associated with a IPFIX source, and then clear the
1788 * statistics. */
1789 void
skIPFIXSourceLogStatsAndClear(skIPFIXSource_t * source)1790 skIPFIXSourceLogStatsAndClear(
1791 skIPFIXSource_t *source)
1792 {
1793 TRACE_ENTRY;
1794
1795 pthread_mutex_lock(&source->stats_mutex);
1796
1797 /* print log message giving the current statistics on the
1798 * skIPFIXSource_t pointer 'source' */
1799 {
1800 fbCollector_t *collector = NULL;
1801 GError *err = NULL;
1802
1803 if (source->saw_yafstats_pkt) {
1804 /* IPFIX from yaf: print the stats */
1805
1806 INFOMSG(("'%s': forward %" PRIu64
1807 ", reverse %" PRIu64
1808 ", ignored %" PRIu64
1809 "; yaf: recs %" PRIu64
1810 ", pkts %" PRIu64
1811 ", dropped-pkts %" PRIu64
1812 ", ignored-pkts %" PRIu64
1813 ", bad-sequence-pkts %" PRIu64
1814 ", expired-frags %" PRIu64),
1815 source->name,
1816 source->forward_flows,
1817 source->reverse_flows,
1818 source->ignored_flows,
1819 source->yaf_exported_flows,
1820 source->yaf_processed_packets,
1821 source->yaf_dropped_packets,
1822 source->yaf_ignored_packets,
1823 source->yaf_notsent_packets,
1824 source->yaf_expired_fragments);
1825
1826 } else if (!source->connections
1827 || !source->base
1828 || !source->base->listener)
1829 {
1830 /* no data or other IPFIX; print count of SiLK flows
1831 * created */
1832
1833 INFOMSG(("'%s': forward %" PRIu64
1834 ", reverse %" PRIu64
1835 ", ignored %" PRIu64),
1836 source->name,
1837 source->forward_flows,
1838 source->reverse_flows,
1839 source->ignored_flows);
1840
1841 } else if (!fbListenerGetCollector(source->base->listener,
1842 &collector, &err))
1843 {
1844 /* sFlow or NetFlowV9, but no collector */
1845
1846 DEBUGMSG("'%s': Unable to get collector for source: %s",
1847 source->name, err->message);
1848 g_clear_error(&err);
1849
1850 INFOMSG(("'%s': forward %" PRIu64
1851 ", reverse %" PRIu64
1852 ", ignored %" PRIu64),
1853 source->name,
1854 source->forward_flows,
1855 source->reverse_flows,
1856 source->ignored_flows);
1857
1858 } else {
1859 /* sFlow or NetFlowV9 */
1860 skIPFIXConnection_t *conn;
1861 RBLIST *iter;
1862 uint64_t prev;
1863
1864 iter = rbopenlist(source->connections);
1865 while ((conn = (skIPFIXConnection_t *)rbreadlist(iter)) != NULL) {
1866 /* store the previous number of dropped NF9/sFlow packets
1867 * and get the new number of dropped packets. */
1868 prev = conn->prev_yafstats.droppedPacketTotalCount;
1869 if (skpcProbeGetType(source->probe) == PROBE_ENUM_SFLOW) {
1870 conn->prev_yafstats.droppedPacketTotalCount
1871 = fbCollectorGetSFlowMissed(
1872 collector, &conn->peer_addr.sa, conn->peer_len,
1873 conn->ob_domain);
1874 } else {
1875 conn->prev_yafstats.droppedPacketTotalCount
1876 = fbCollectorGetNetflowMissed(
1877 collector, &conn->peer_addr.sa, conn->peer_len,
1878 conn->ob_domain);
1879 }
1880 if (prev > conn->prev_yafstats.droppedPacketTotalCount) {
1881 /* assume a new collector */
1882 TRACEMSG(4, (("Assuming new collector: NF9 loss dropped"
1883 " old = %" PRIu64 ", new = %" PRIu64),
1884 prev,
1885 conn->prev_yafstats.droppedPacketTotalCount));
1886 prev = 0;
1887 }
1888 source->yaf_dropped_packets
1889 += conn->prev_yafstats.droppedPacketTotalCount - prev;
1890 }
1891 rbcloselist(iter);
1892
1893 INFOMSG(("'%s': forward %" PRIu64
1894 ", reverse %" PRIu64
1895 ", ignored %" PRIu64
1896 ", %s: missing-pkts %" PRIu64),
1897 source->name,
1898 source->forward_flows,
1899 source->reverse_flows,
1900 source->ignored_flows,
1901 ((skpcProbeGetType(source->probe) == PROBE_ENUM_SFLOW)
1902 ? "sflow" : "nf9"),
1903 source->yaf_dropped_packets);
1904 }
1905 }
1906
1907 #if SOURCE_LOG_MAX_PENDING_WRITE
1908 if (skpcProbeGetLogFlags(source->probe) & SOURCE_LOG_MAX_PENDING_WRITE) {
1909 INFOMSG(("'%s': Maximum number of read records waiting to be written:"
1910 " %" PRIu32), source->name, source->max_pending);
1911 }
1912 #endif
1913
1914 /* reset (set to zero) statistics on the skIPFIXSource_t
1915 * 'source' */
1916 {
1917 source->yaf_dropped_packets = 0;
1918 source->yaf_ignored_packets = 0;
1919 source->yaf_notsent_packets = 0;
1920 source->yaf_expired_fragments = 0;
1921 source->yaf_processed_packets = 0;
1922 source->yaf_exported_flows = 0;
1923 source->forward_flows = 0;
1924 source->reverse_flows = 0;
1925 source->ignored_flows = 0;
1926 source->max_pending = 0;
1927 }
1928
1929 pthread_mutex_unlock(&source->stats_mutex);
1930 TRACE_RETURN;
1931 }
1932
1933
1934 /*
1935 ** Local Variables:
1936 ** mode:c
1937 ** indent-tabs-mode:nil
1938 ** c-basic-offset:4
1939 ** End:
1940 */
1941