1 /*
2 ** Copyright (C) 2006-2020 by Carnegie Mellon University.
3 **
4 ** @OPENSOURCE_LICENSE_START@
5 ** See license information in ../../LICENSE.txt
6 ** @OPENSOURCE_LICENSE_END@
7 */
8 
9 /*
10  *  ipfixsource.c
11  *
12  *    This file and skipfix.c are tightly coupled, and together they
13  *    read IPFIX records and convert them to SiLK flow records.
14  *
15  *    This file is primary about setting up and tearing down the data
16  *    structures used when processing IPFIX.
17  *
18  *    The skipfix.c file primarly handles the conversion, and it is
19  *    where the reading functions exist.
20  */
21 
22 #include <silk/silk.h>
23 
24 RCSIDENT("$SiLK: ipfixsource.c 7af5eab585e4 2020-04-15 15:56:48Z mthomas $");
25 
26 #include "ipfixsource.h"
27 #include <silk/redblack.h>
28 #include <silk/skthread.h>
29 #include <silk/skvector.h>
30 #include "infomodel.h"
31 
32 #ifdef  SKIPFIXSOURCE_TRACE_LEVEL
33 #define TRACEMSG_LEVEL SKIPFIXSOURCE_TRACE_LEVEL
34 #endif
35 #define TRACEMSG(lvl, msg)    TRACEMSG_TO_TRACEMSGLVL(lvl, msg)
36 #include <silk/sktracemsg.h>
37 
38 
39 /*
40  *  IMPLEMENTATION NOTES
41  *
42  *  Each probe is represented by a single skIPFIXSource_t object.
43  *
44  *  For probes that process file-based IPFIX sources, the
45  *  skIPFIXSource_t object contains an fBuf_t object.  When the caller
46  *  invokes skIPFIXSourceGetGeneric(), the next record is read from
47  *  the fBuf_t and the record is returned.  For consistency with
48  *  network processing (described next), the file-based
49  *  skIPFIXSource_t has an skIPFIXSourceBase_t object, but that object
50  *  does little for file-based sources.
51  *
52  *  For probes that process network-based IPFIX sources, the
53  *  combination of the following four values must be unique: protocol,
54  *  listen-on-port, listen-as-address, accept-from-host.  (Note that
55  *  an ADDR_ANY value for listen-as-address or accept-from-host
56  *  matches all other addresses.)
57  *
58  *  Each skIPFIXSource_t references an skIPFIXSourceBase_t object.
59  *  Each unique listen-as-address/listen-to-port/protocol triple is
60  *  handled by a single fbListener_t object, which is contained in the
61  *  skIPFIXSourceBase_t object.  When two skIPFIXSource_t's differ
62  *  only by their accept-from-host addreses, the skIPFIXSource_t's
63  *  reference the same skIPFIXSourceBase_t object.  The
64  *  skIPFIXSourceBase_t objects contain a reference-count.  The
65  *  skIPFIXSourceBase_t is destroyed when the last skIPFIXSource_t
66  *  referring to it is destroyed.
67  *
68  *  An skIPFIXConnection_t represents a connection, which is one of
69  *  two things: In the TCP case, a connection is equivalent to a TCP
70  *  connection.  In the UDP case, a connection is a given set of IPFIX
71  *  or NFv9 UDP packets sent from a given address, to a given address,
72  *  on a given port, with a given domain ID.  The skIPFIXConnection_t
73  *  object is ipfixsource's way of mapping to the fbSession_t object
74  *  in libfixbuf.
75  *
76  *  There can be multiple active connections on a probe---consider a
77  *  probe that collects from two machines that load-balance.  In the
78  *  code, this is represented by having each skIPFIXConnection_t
79  *  object point to its skIPFIXSource_t.  As described below, the
80  *  skIPFIXConnection_t is stored as the context pointer on the
81  *  libfixbuf fbCollector_t object.
82  *
83  *  When a new TCP connection arrives or if a new UDP connection is
84  *  seen and we are using a fixbuf that supports multi-UDP, the
85  *  fixbufConnect() callback function first determines whether the
86  *  peer is allowed to connect.  If the peer is allowed, the function
87  *  sets the context pointer for the fbCollector_t object to the a new
88  *  skIPFIXConnection_t object which contains statistics information
89  *  for the connection and the skIPFIXSource_t object associated with
90  *  the connection.  These skIPFIXConnection_t objects are destroyed
91  *  in the fixbufDisconnect() callback.
92  *
93  *  When a new UDP peer sends data to the listener, the actual address
94  *  is not known until the underlying recvmesg() call itself, rather
95  *  than in an accept()-like call similar to TCP.  What this means is
96  *  that in this scenario the fixbufConnect() appInit function is not
97  *  called until a call to fBufNext() or fBufNextCollectionTemplate()
98  *  is called.
99  *
100  *  There is a similar fixbufConnectUDP() function to handle UDP
101  *  connections when libfixbuf does not support multi-UDP.  However,
102  *  the fundamental difference is this: TCP connections are associated
103  *  with a new fbCollector_t at connection time.  Non-multi-UDP
104  *  connections are associated with a new fbCollector_t during the
105  *  fbListenerAlloc() call.
106  *
107  *  FIXBUF API ISSUE: The source objects connected to the
108  *  fbCollector_t objects have to be passed to the
109  *  fixbufConnect*() calls via global objects---newly created
110  *  sources are put into a red-black tree; the call to
111  *  fixbufConnect*() attempts to find the value in the red-black tree.
112  *  It would have made more sense if fbListenerAlloc() took a
113  *  caller-specified context pointer which would get passed to the
114  *  fbListenerAppInit_fn() and fbListenerAppFree_fn() functions.
115  *
116  *  There is one ipfix_reader() thread per skIPFIXSourceBase_t object.
117  *  This thread loops around fbListenerWait() returning fBuf_t
118  *  objects.  The underlying skIPFIXConnection_t containing the source
119  *  information is grabbed from the fBuf_t's collector.  The
120  *  fBufNext() is used to read the data from the fBuf_t and this data
121  *  is associated with the given source by either inserting it into
122  *  the source's circular buffer, or by adding the stats information
123  *  to the source.  Then we loop back determining any new connection
124  *  and dealing with the next piece of data until the fBuf_t empties.
125  *  We then return to fbListenerWait() to get the next fBuf_t.
126 
127  *  Since there is one thread per listener, if one source attached to
128  *  a listener blocks due to the circular buffer becoming full, all
129  *  sources attached to the listener will block as well.  Solving this
130  *  problem would involve more threads, and moving away from the
131  *  fbListenerWait() method of doing things.  We could instead have a
132  *  separate thread per connection.  This would require us to handle
133  *  the connections (bind/listen/accept) ourselves, and then create
134  *  fBufs from the resulting file descriptors.
135  */
136 
137 
138 /* LOCAL DEFINES AND TYPEDEFS */
139 
140 /*
141  *    Name of environment variable that, when set, cause SiLK to
142  *    ignore any G_LOG_LEVEL_WARNING messages.
143  */
144 #define SK_ENV_FIXBUF_SUPPRESS_WARNING "SILK_LIBFIXBUF_SUPPRESS_WARNINGS"
145 
146 /*
147  *  SILK_PROTO_TO_FIXBUF_TRANSPORT(silk_proto, &fb_trans);
148  *
149  *    Set the fbTransport_t value in the memory referenced by
150  *    'fb_trans' based on the SiLK protocol value 'silk_proto'.
151  */
152 #define SILK_PROTO_TO_FIXBUF_TRANSPORT(silk_proto, fb_trans)    \
153     switch (silk_proto) {                                       \
154       case SKPC_PROTO_SCTP:                                     \
155         *(fb_trans) = FB_SCTP;                                  \
156         break;                                                  \
157       case SKPC_PROTO_TCP:                                      \
158         *(fb_trans) = FB_TCP;                                   \
159         break;                                                  \
160       case SKPC_PROTO_UDP:                                      \
161         *(fb_trans) = FB_UDP;                                   \
162         break;                                                  \
163       default:                                                  \
164         skAbortBadCase(silk_proto);                             \
165     }
166 
167 /*
168  *    The 'addr_to_source' member of 'skIPFIXSourceBase_t' is a
169  *    red-black tree whose data members are 'peeraddr_source_t'
170  *    objects.  The tree is used when multiple sources listen on the
171  *    same port and the accept-from-host addresses are used to choose
172  *    the source based on the peer address of the sender.
173  *
174  *    The 'addr_to_source' tree uses the peeraddr_compare() comparison
175  *    function.
176  */
177 typedef struct peeraddr_source_st {
178     const sk_sockaddr_t *addr;
179     skIPFIXSource_t     *source;
180 } peeraddr_source_t;
181 
182 
183 /* EXPORTED VARIABLE DEFINITIONS */
184 
185 /* descriptions are in ipfixsource.h */
186 
187 /* do the names of IE 48, 49, 50 follow fixbuf-1.x or 2.x? */
188 uint32_t sampler_flags = 0;
189 
190 
191 /* LOCAL VARIABLE DEFINITIONS */
192 
193 /* Mutex around calls to skiCreateListener. */
194 static pthread_mutex_t create_listener_mutex = PTHREAD_MUTEX_INITIALIZER;
195 
196 /* Mutex around listener_to_source_base tree and count. */
197 static pthread_mutex_t global_tree_mutex = PTHREAD_MUTEX_INITIALIZER;
198 
199 /* Map from listeners to skIPFIXSourceBase_t objects.  Objects in
200  * rbtree are skIPFIXSourceBase_t pointers. */
201 static struct rbtree *listener_to_source_base = NULL;
202 
203 /* Number of ipfix sources (both networked and file-based) */
204 static uint32_t source_base_count = 0;
205 
206 
207 /*
208  *    There is a single infomation model.
209  */
210 static fbInfoModel_t *ski_model = NULL;
211 
212 /*
213  *    When processing files with fixbuf, the session object
214  *    (fbSession_t) is owned the reader/write buffer (fBuf_t).
215  *
216  *    When doing network processing, the fBuf_t does not own the
217  *    session.  We use this global vector to maintain those session
218  *    pointers so they can be freed at shutdown.
219  */
220 static sk_vector_t *session_list = NULL;
221 
222 
223 
224 /* FUNCTION DEFINITIONS */
225 
226 /*
227  *     The listener_to_source_base_find() function is used as the
228  *     comparison function for the listener_to_source_base red-black
229  *     tree.  Stores objects of type skIPFIXSourceBase_t, orders by
230  *     fbListener_t pointer value.
231  */
232 static int
listener_to_source_base_find(const void * va,const void * vb,const void * ctx)233 listener_to_source_base_find(
234     const void         *va,
235     const void         *vb,
236     const void         *ctx)
237 {
238     const fbListener_t *a = ((const skIPFIXSourceBase_t *)va)->listener;
239     const fbListener_t *b = ((const skIPFIXSourceBase_t *)vb)->listener;
240     SK_UNUSED_PARAM(ctx);
241 
242     return ((a < b) ? -1 : (a > b));
243 }
244 
245 
246 /*
247  *     The peeraddr_compare() function is used as the comparison
248  *     function for the skIPFIXSourceBase_t's red-black tree,
249  *     addr_to_source.
250  *
251  *     The tree stores peeraddr_source_t objects, keyed by
252  *     sk_sockaddr_t address of the accepted peers.
253  */
254 static int
peeraddr_compare(const void * va,const void * vb,const void * ctx)255 peeraddr_compare(
256     const void         *va,
257     const void         *vb,
258     const void         *ctx)
259 {
260     const sk_sockaddr_t *a = ((const peeraddr_source_t *)va)->addr;
261     const sk_sockaddr_t *b = ((const peeraddr_source_t *)vb)->addr;
262     SK_UNUSED_PARAM(ctx);
263 
264     return skSockaddrCompare(a, b, SK_SOCKADDRCOMP_NOPORT);
265 }
266 
267 
268 /*
269  *     The pointer_cmp() function is used compare skIPFIXConnection_t
270  *     pointers in the 'connections' red-black tree on skIPFIXSource_t
271  *     objects.
272  */
273 static int
pointer_cmp(const void * va,const void * vb,const void * ctx)274 pointer_cmp(
275     const void         *va,
276     const void         *vb,
277     const void         *ctx)
278 {
279     SK_UNUSED_PARAM(ctx);
280     return ((va < vb) ? -1 : (va > vb));
281 }
282 
283 
284 /*
285  *     The free_source() function is used to free an skIPFIXSource_t
286  *     object.  This only frees the object and its data, it does not
287  *     mark up any connected skIPFIXSourceBase_t object in the
288  *     process.
289  */
290 static void
free_source(skIPFIXSource_t * source)291 free_source(
292     skIPFIXSource_t    *source)
293 {
294     TRACE_ENTRY;
295 
296     if (source == NULL) {
297         TRACEMSG(3, ("source was null"));
298         TRACE_RETURN;
299     }
300 
301     assert(source->connection_count == 0);
302 
303     pthread_mutex_destroy(&source->stats_mutex);
304     if (source->circbuf) {
305         skCircBufDestroy(source->circbuf);
306     }
307     if (source->connections) {
308         rbdestroy(source->connections);
309     }
310     if (source->readbuf) {
311         TRACEMSG(3, ("freeing fbuf"));
312         fBufFree(source->readbuf);
313     }
314     if (source->fileptr.of_fp) {
315         TRACEMSG(3, ("closing file"));
316         skFileptrClose(&source->fileptr, &WARNINGMSG);
317     }
318     if (source->file_conn) {
319         TRACEMSG(3, ("freeing file_conn (%zu bytes)",
320                      sizeof(source->file_conn)));
321         free(source->file_conn);
322     }
323 
324     free(source);
325     TRACE_RETURN;
326 }
327 
328 
329 /*
330  *     The fixbufConnect() function is passed to fbListenerAlloc() as
331  *     its 'appinit' callback (fbListenerAppInit_fn).
332  *     This function is called from within the fbListenerWait() call
333  *     when a new connection to the listening socket is made.  (In
334  *     addition, for UDP sources, it is called directly by
335  *     fbListenerAlloc() with a NULL peer.)
336  *
337  *     Its primary purposes are to accept/reject the connection,
338  *     create an skIPFIXConnection_t, and set the the collector's
339  *     context to the skIPFIXConnection_t.  The skIPFIXConnection_t
340  *     remembers the peer information, contains the stats for this
341  *     connection, and references the source object.
342  */
343 static gboolean
fixbufConnect(fbListener_t * listener,void ** ctx,int fd,struct sockaddr * peer,size_t peerlen,GError ** err)344 fixbufConnect(
345     fbListener_t       *listener,
346     void              **ctx,
347     int                 fd,
348     struct sockaddr    *peer,
349     size_t              peerlen,
350     GError            **err)
351 {
352     fbCollector_t *collector;
353     char addr_buf[2 * SKIPADDR_STRLEN];
354     skIPFIXSourceBase_t target_base;
355     skIPFIXSourceBase_t *base;
356     const peeraddr_source_t *found_peer;
357     peeraddr_source_t target_peer;
358     skIPFIXSource_t *source;
359     skIPFIXConnection_t *conn = NULL;
360     sk_sockaddr_t addr;
361     gboolean retval = 0;
362 
363     TRACE_ENTRY;
364     SK_UNUSED_PARAM(fd);
365 
366     if (peer == NULL) {
367         /* This function is being called for a UDP listener at init
368          * time.  Ignore this. */
369         TRACE_RETURN(1);
370     }
371     if (peerlen > sizeof(addr)) {
372         TRACEMSG(1, (("ipfixsource rejected connection:"
373                       " peerlen too large: %" SK_PRIuZ " > %" SK_PRIuZ),
374                      peerlen, sizeof(addr)));
375         g_set_error(err, SK_IPFIXSOURCE_DOMAIN, SK_IPFIX_ERROR_CONN,
376                     ("peerlen unexpectedly large: %" SK_PRIuZ), peerlen);
377         TRACE_RETURN(0);
378     }
379 
380     memcpy(&addr.sa, peer, peerlen);
381     skSockaddrString(addr_buf, sizeof(addr_buf), &addr);
382 
383     TRACEMSG(3, (("ipfixsource processing connection from '%s'"), addr_buf));
384 
385     /* Find the skIPFIXSourceBase_t object associated with this
386      * listener */
387     target_base.listener = listener;
388     pthread_mutex_lock(&global_tree_mutex);
389     base = ((skIPFIXSourceBase_t *)
390             rbfind(&target_base, listener_to_source_base));
391     pthread_mutex_unlock(&global_tree_mutex);
392     if (base == NULL) {
393         TRACEMSG(1, (("ipfixsource rejected connection from '%s':"
394                       " unable to find base given listener"), addr_buf));
395         g_set_error(err, SK_IPFIXSOURCE_DOMAIN, SK_IPFIX_ERROR_CONN,
396                     "Unable to find base for listener");
397         TRACE_RETURN(0);
398     }
399 
400     conn = (skIPFIXConnection_t*)calloc(1, sizeof(skIPFIXConnection_t));
401     if (conn == NULL) {
402         TRACEMSG(1, (("ipfixsource rejected connection from '%s':"
403                       " unable to allocate connection object"), addr_buf));
404         g_set_error(err, SK_IPFIXSOURCE_DOMAIN, SK_IPFIX_ERROR_CONN,
405                     "Unable to allocate connection object");
406         TRACE_RETURN(0);
407     }
408 
409     pthread_mutex_lock(&base->mutex);
410 
411     if (base->any) {
412         /* When there is no accept-from address on the probe, there is
413          * a one-to-one mapping between source and base, and all
414          * connections are permitted. */
415         source = base->any;
416     } else {
417         /* Using the address of the incoming connection, search for
418          * the source object associated with this address. */
419         assert(base->addr_to_source);
420         target_peer.addr = &addr;
421         found_peer = ((const peeraddr_source_t*)
422                       rbfind(&target_peer, base->addr_to_source));
423         if (NULL == found_peer) {
424             /* Reject hosts that do not appear in accept-from-host */
425             TRACEMSG(1, (("ipfixsource rejected connection from '%s':"
426                           " host prohibited"), addr_buf));
427             g_set_error(err, SK_IPFIXSOURCE_DOMAIN, SK_IPFIX_ERROR_CONN,
428                         "Connection prohibited from %s", addr_buf);
429             free(conn);
430             goto END;
431         }
432         source = found_peer->source;
433     }
434 
435     if (source->stopped) {
436         TRACEMSG(1, (("ipfixsource rejected connection from '%s':"
437                       " source is stopping"), addr_buf));
438         g_set_error(err, SK_IPFIXSOURCE_DOMAIN, SK_IPFIX_ERROR_CONN,
439                     "Source is stopping");
440         free(conn);
441         goto END;
442     }
443 
444     /* If this is an NetFlowV9/sFLow source, store the
445      * skIPFIXConnection_t in the red-black tree on the source so we
446      * can log about missing NetFlowV9/sFlow packets. */
447     if (source->connections) {
448         skIPFIXConnection_t *found_conn;
449 
450         pthread_mutex_lock(&source->stats_mutex);
451         found_conn = ((skIPFIXConnection_t*)
452                       rbsearch(conn, source->connections));
453         pthread_mutex_unlock(&source->stats_mutex);
454         if (found_conn != conn) {
455             TRACEMSG(1, (("ipfixsource rejected connection from '%s':"
456                           " unable to store connection on source"), addr_buf));
457             g_set_error(err, SK_IPFIXSOURCE_DOMAIN, SK_IPFIX_ERROR_CONN,
458                         "Unable to store connection on source");
459             free(conn);
460             goto END;
461         }
462     }
463 
464     /* Update the skIPFIXConnection_t with the information necessary
465      * to provide a useful log message at disconnect.  This info is
466      * also used to get NetFlowV9/sFlow missed packets. */
467     if (peerlen <= sizeof(conn->peer_addr)) {
468         memcpy(&conn->peer_addr.sa, peer, peerlen);
469         conn->peer_len = peerlen;
470     }
471 
472     TRACEMSG(4, ("Creating new conn = %p for source = %p", conn, source));
473 
474     /* Set the skIPFIXConnection_t to point to the source, increment
475      * the source's connection_count, and set the context pointer to
476      * the connection.  */
477     conn->source = source;
478     ++source->connection_count;
479     retval = 1;
480     *ctx = conn;
481 
482     /* Get the domain (also needed for NetFlowV9/sFlow missed pkts).
483      * In the TCP case, the collector does not exist yet, and the
484      * GetCollector call returns false.  In the UDP-IPFIX case, the
485      * domain of the collector always returns 0. */
486     if (source->connections
487         && fbListenerGetCollector(listener, &collector, NULL))
488     {
489         conn->ob_domain = fbCollectorGetObservationDomain(collector);
490         INFOMSG("'%s': accepted connection from %s, domain %#06x",
491                 source->name, addr_buf, conn->ob_domain);
492     } else {
493         INFOMSG("'%s': accepted connection from %s",
494                 source->name, addr_buf);
495     }
496 
497   END:
498     pthread_mutex_unlock(&base->mutex);
499     TRACE_RETURN(retval);
500 }
501 
502 
503 /*
504  *     The fixbufDisconnect() function is passed to fbListenerAlloc()
505  *     as its 'appfree' callback (fbListenerAppFree_fn).  This
506  *     function is called by fBufFree().  The argument to this
507  *     function is the context (the skIPFIXConnection_t) that was set
508  *     by fixbufConnect().
509  *
510  *     The function decrefs the source and frees it if the
511  *     connection_count hits zero and the source has been asked to be
512  *     destroyed.  It then frees the connection object.
513  */
514 static void
fixbufDisconnect(void * ctx)515 fixbufDisconnect(
516     void               *ctx)
517 {
518     skIPFIXConnection_t *conn = (skIPFIXConnection_t *)ctx;
519 
520     TRACE_ENTRY;
521 
522     if (conn == NULL) {
523         TRACE_RETURN;
524     }
525 
526     TRACEMSG(3, (("fixbufDisconnection connection_count = %" PRIu32),
527                  conn->source->connection_count));
528 
529     /* Remove the connection from the source. */
530     --conn->source->connection_count;
531     if (conn->source->connections) {
532         pthread_mutex_lock(&conn->source->stats_mutex);
533         rbdelete(conn, conn->source->connections);
534         pthread_mutex_unlock(&conn->source->stats_mutex);
535     }
536 
537     /* For older fixbuf, only TCP connections contain the peer addr */
538     if (conn->peer_len) {
539         char addr_buf[2 * SKIPADDR_STRLEN];
540 
541         skSockaddrString(addr_buf, sizeof(addr_buf), &conn->peer_addr);
542         if (conn->ob_domain) {
543             INFOMSG("'%s': noticed disconnect by %s, domain %#06x",
544                     conn->source->name, addr_buf, conn->ob_domain);
545         } else {
546             INFOMSG("'%s': noticed disconnect by %s",
547                     conn->source->name, addr_buf);
548         }
549     }
550 
551     TRACEMSG(4, ("Destroying conn = %p for source %p", conn, conn->source));
552 
553     /* Destroy it if this is the last reference to the source. */
554     if (conn->source->destroy && conn->source->connection_count == 0) {
555         free_source(conn->source);
556     }
557     free(conn);
558     TRACE_RETURN;
559 }
560 
561 
562 /*
563  *    Return a pointer to the single information model.  If necessary
564  *    create and initialize it.
565  */
566 fbInfoModel_t *
skiInfoModel(void)567 skiInfoModel(
568     void)
569 {
570     fbInfoModel_t *m = ski_model;
571 
572     if (!m) {
573         TRACEMSG(4, ("Allocating an info model"));
574         m = fbInfoModelAlloc();
575         /* call a function in infomodel.c to update the info model
576          * with the info elements defined in the .xml file(s) in the
577          * infomodel subdirectory */
578         infomodelAddGlobalElements(m);
579         ski_model = m;
580     }
581     return ski_model;
582 }
583 
584 /*
585  *    Free the single information model.
586  */
587 void
skiInfoModelFree(void)588 skiInfoModelFree(
589     void)
590 {
591     fbInfoModel_t *m = ski_model;
592 
593     ski_model = NULL;
594     if (m) {
595         TRACEMSG(4, ("Freeing an info model"));
596         fbInfoModelFree(m);
597     }
598 }
599 
600 
601 /**
602  *    Free the memory associated with the Info Model---note that doing
603  *    so is not tread safe.
604  */
605 void
skiTeardown(void)606 skiTeardown(
607     void)
608 {
609     size_t i;
610     fbSession_t *session;
611 
612     if (session_list) {
613         for (i = 0; i < skVectorGetCount(session_list); i++) {
614             skVectorGetValue(&session, session_list, i);
615             fbSessionFree(session);
616         }
617         skVectorDestroy(session_list);
618         session_list = NULL;
619     }
620 
621     skiInfoModelFree();
622 }
623 
624 
625 /**
626  *    Create an IPFIX Collecting Process listener.
627  */
628 static fbListener_t *
skiCreateListener(skIPFIXSourceBase_t * base,GError ** err)629 skiCreateListener(
630     skIPFIXSourceBase_t    *base,
631     GError                **err)
632 {
633     fbSession_t *session;
634     fbListener_t *listener;
635     int created_vec = 0;
636 
637     TRACE_ENTRY;
638 
639     ASSERT_MUTEX_LOCKED(&create_listener_mutex);
640 
641     /* The session is not owned by the buffer or the listener, so
642      * maintain a vector of them for later destruction. */
643     if (!session_list) {
644         session_list = skVectorNew(sizeof(fbSession_t *));
645         if (session_list == NULL) {
646             TRACE_RETURN(NULL);
647         }
648         created_vec = 1;
649     }
650     /* fixbuf (glib) exits on allocation error */
651     session = fbSessionAlloc(skiInfoModel());
652 
653     /* Initialize session for reading */
654     if (!skiSessionInitReader(session, err)) {
655         goto ERROR;
656     }
657     if (skVectorAppendValue(session_list, &session) != 0) {
658         goto ERROR;
659     }
660 
661     /* Allocate a listener.  'fixbufConnect' is called on each
662      * collection attempt; vetoes connection attempts and creates
663      * application context. */
664     listener = fbListenerAlloc(base->connspec, session, fixbufConnect,
665                                fixbufDisconnect, err);
666     TRACE_RETURN(listener);
667 
668   ERROR:
669     fbSessionFree(session);
670     if (created_vec) {
671         skVectorDestroy(session_list);
672         session_list = NULL;
673     }
674     TRACE_RETURN(NULL);
675 }
676 
677 
678 /**
679  *    Create a buffer pointer suitable for use for
680  *    ski_fixrec_next(). The file pointer must be opened for reading.
681  */
682 static fBuf_t *
skiCreateReadBufferForFP(void * ctx,FILE * fp,GError ** err)683 skiCreateReadBufferForFP(
684     void               *ctx,
685     FILE               *fp,
686     GError            **err)
687 {
688     fbSession_t    *session;
689     fBuf_t         *fbuf;
690 
691     /* Allocate a session.  The session will be owned by the fbuf, so
692      * don't save it for later freeing. */
693     session = fbSessionAlloc(skiInfoModel());
694 
695     /* Initialize session for reading */
696     if (!skiSessionInitReader(session, err)) {
697         fbSessionFree(session);
698         return NULL;
699     }
700 
701     /* Create a buffer with the session and a collector */
702     fbuf = fBufAllocForCollection(session, fbCollectorAllocFP(ctx, fp));
703 
704     /* Make certain the fbuf has an internal template */
705     if (!fBufSetInternalTemplate(fbuf, SKI_YAFSTATS_TID, err)) {
706         fBufFree(fbuf);
707         return NULL;
708     }
709 
710     return fbuf;
711 }
712 
713 
714 
715 
716 /*
717  *     The free_connspec() function frees a fbConnSpec_t object.
718  */
719 static void
free_connspec(fbConnSpec_t * connspec)720 free_connspec(
721     fbConnSpec_t       *connspec)
722 {
723     TRACE_ENTRY;
724 
725     if (connspec->host) {
726         free(connspec->host);
727     }
728     if (connspec->svc) {
729         free(connspec->svc);
730     }
731     free(connspec);
732 
733     TRACE_RETURN;
734 }
735 
736 
737 /*
738  *     The ipfixSourceCreateBase() function allocates a new
739  *     skIPFIXSourceBase_t object.
740  */
741 static skIPFIXSourceBase_t *
ipfixSourceCreateBase(void)742 ipfixSourceCreateBase(
743     void)
744 {
745     skIPFIXSourceBase_t *base;
746 
747     TRACE_ENTRY;
748 
749     base = (skIPFIXSourceBase_t*)calloc(1, sizeof(skIPFIXSourceBase_t));
750     if (base == NULL) {
751         TRACE_RETURN(NULL);
752     }
753 
754     pthread_mutex_init(&base->mutex, NULL);
755     pthread_cond_init(&base->cond, NULL);
756 
757     TRACE_RETURN(base);
758 }
759 
760 
761 /*
762  *     The ipfixSourceCreateFromFile() function creates a new
763  *     skIPFIXSource_t object and associated base object for a
764  *     file-based IPFIX stream.
765  */
766 static skIPFIXSource_t *
ipfixSourceCreateFromFile(const skpc_probe_t * probe,const char * path_name)767 ipfixSourceCreateFromFile(
768     const skpc_probe_t *probe,
769     const char         *path_name)
770 {
771     skIPFIXSourceBase_t *base   = NULL;
772     skIPFIXSource_t     *source = NULL;
773     GError              *err    = NULL;
774     int                  rv;
775 
776     TRACE_ENTRY;
777 
778     /* Create the base object */
779     base = ipfixSourceCreateBase();
780     if (base == NULL) {
781         goto ERROR;
782     }
783     pthread_mutex_lock(&global_tree_mutex);
784     ++source_base_count;
785     pthread_mutex_unlock(&global_tree_mutex);
786 
787     /* Create the source object */
788     source = (skIPFIXSource_t*)calloc(1, sizeof(*source));
789     if (source == NULL) {
790         goto ERROR;
791     }
792 
793     /* Open the file */
794     source->fileptr.of_name = path_name;
795     rv = skFileptrOpen(&source->fileptr, SK_IO_READ);
796     if (rv) {
797         ERRMSG("Unable to open file '%s': %s",
798                path_name, skFileptrStrerror(rv));
799         goto ERROR;
800     }
801     if (SK_FILEPTR_IS_PROCESS == source->fileptr.of_type) {
802         skAppPrintErr("Reading from gzipped files is not supported");
803         goto ERROR;
804     }
805 
806     /* Attach the source and base objects */
807     source->base = base;
808     base->any = source;
809     ++base->source_count;
810 
811     /* Set the source's name from the probe name */
812     source->probe = probe;
813     source->name = skpcProbeGetName(probe);
814 
815     /* Create a connection object that points to the source, and store
816      * it on the source */
817     source->file_conn =
818         (skIPFIXConnection_t *)calloc(1, sizeof(skIPFIXConnection_t));
819     if (NULL == source->file_conn) {
820         goto ERROR;
821     }
822     source->file_conn->source = source;
823 
824     /* Create a file-based fBuf_t for the source */
825     source->readbuf = skiCreateReadBufferForFP((void *)source->file_conn,
826                                                source->fileptr.of_fp, &err);
827     if (source->readbuf == NULL) {
828         if (err) {
829             ERRMSG("%s: %s", "skiCreateReadBufferForFP", err->message);
830         }
831         goto ERROR;
832     }
833 
834     pthread_mutex_init(&source->stats_mutex, NULL);
835 
836     TRACE_RETURN(source);
837 
838   ERROR:
839     g_clear_error(&err);
840     if (source) {
841         if (NULL != source->fileptr.of_fp) {
842             skFileptrClose(&source->fileptr, &WARNINGMSG);
843         }
844         if (source->readbuf) {
845             fBufFree(source->readbuf);
846         }
847         free(source->file_conn);
848         free(source);
849     }
850     if (base) {
851         free(base);
852         pthread_mutex_lock(&global_tree_mutex);
853         --source_base_count;
854         if (0 == source_base_count) {
855             skiInfoModelFree();
856         }
857         pthread_mutex_unlock(&global_tree_mutex);
858     }
859     TRACE_RETURN(NULL);
860 }
861 
862 
863 /*
864  *    Add the 'source' object to the 'base' object (or for an
865  *    alternate view, have the 'source' wrap the 'base').  Return 0 on
866  *    success, or -1 on failure.
867  */
868 static int
ipfixSourceBaseAddIPFIXSource(skIPFIXSourceBase_t * base,skIPFIXSource_t * source)869 ipfixSourceBaseAddIPFIXSource(
870     skIPFIXSourceBase_t *base,
871     skIPFIXSource_t     *source)
872 {
873     const sk_sockaddr_array_t **accept_from;
874     peeraddr_source_t *peeraddr;
875     const peeraddr_source_t *found;
876     fbTransport_t transport;
877     uint32_t accept_from_count;
878     uint32_t i;
879     uint32_t j;
880     int rv = -1;
881 
882     TRACE_ENTRY;
883 
884     assert(base);
885     assert(source);
886     assert(source->probe);
887     assert(NULL == source->base);
888 
889     accept_from_count = skpcProbeGetAcceptFromHost(source->probe,&accept_from);
890 
891     /* Lock the base */
892     pthread_mutex_lock(&base->mutex);
893 
894     /* Base must not be configured to accept packets from any host. */
895     if (base->any) {
896         goto END;
897     }
898     if (NULL == accept_from || 0 == accept_from_count) {
899         /* When no accept-from-host is specified, this source accepts
900          * packets from any address and there should be a one-to-one
901          * mapping between source and base */
902         if (base->addr_to_source) {
903             /* The base already references another source. */
904             goto END;
905         }
906         base->any = source;
907         source->base = base;
908         ++base->source_count;
909     } else {
910         /* Make sure the sources's protocol match the base's protocol */
911         SILK_PROTO_TO_FIXBUF_TRANSPORT(
912             skpcProbeGetProtocol(source->probe), &transport);
913         if (base->connspec->transport != transport) {
914             goto END;
915         }
916 
917         /* Connect the base to the source */
918         source->base = base;
919 
920         if (NULL == base->addr_to_source) {
921             base->addr_to_source = rbinit(peeraddr_compare, NULL);
922             if (base->addr_to_source == NULL) {
923                 goto END;
924             }
925         }
926 
927         /* Add a mapping on the base for each accept-from-host address
928          * on this source. */
929         for (j = 0; j < accept_from_count; ++j) {
930             for (i = 0; i < skSockaddrArrayGetSize(accept_from[j]); ++i) {
931                 peeraddr = ((peeraddr_source_t*)
932                             calloc(1, sizeof(peeraddr_source_t)));
933                 if (peeraddr == NULL) {
934                     goto END;
935                 }
936                 peeraddr->source = source;
937                 peeraddr->addr = skSockaddrArrayGet(accept_from[j], i);
938                 found = ((const peeraddr_source_t*)
939                          rbsearch(peeraddr, base->addr_to_source));
940                 if (found != peeraddr) {
941                     if (found && (found->source == peeraddr->source)) {
942                         /* Duplicate address, same connection */
943                         free(peeraddr);
944                         continue;
945                     }
946                     /* Memory error adding to tree */
947                     free(peeraddr);
948                     goto END;
949                 }
950             }
951         }
952 
953         ++base->source_count;
954     }
955 
956     rv = 0;
957 
958   END:
959     pthread_mutex_unlock(&base->mutex);
960     TRACE_RETURN(rv);
961 }
962 
963 
964 void
ipfixSourceBaseFreeListener(skIPFIXSourceBase_t * base)965 ipfixSourceBaseFreeListener(
966     skIPFIXSourceBase_t    *base)
967 {
968     ASSERT_MUTEX_LOCKED(&base->mutex);
969 
970     /* Remove this base object from the listener_to_source_base
971      * red-black tree */
972     pthread_mutex_lock(&global_tree_mutex);
973     rbdelete(base, listener_to_source_base);
974     pthread_mutex_unlock(&global_tree_mutex);
975 
976     TRACEMSG(3, ("base %p calling fbListenerFree", base));
977 
978     /* Destroy the fbListener_t object.  This destroys the fbuf if the
979      * stream is UDP. */
980     fbListenerFree(base->listener);
981     base->listener = NULL;
982 }
983 
984 
985 /*
986  *    Adds the skIPFIXSourceBase_t object 'base' to the global
987  *    red-black tree of base objects, creating the tree if it does not
988  *    exist.  Returns 0 on success and -1 on failure.
989  */
990 static int
ipfixSourceBaseAddToGlobalList(skIPFIXSourceBase_t * base)991 ipfixSourceBaseAddToGlobalList(
992     skIPFIXSourceBase_t    *base)
993 {
994     const void *rv;
995 
996     pthread_mutex_lock(&global_tree_mutex);
997 
998     if (listener_to_source_base == NULL) {
999         listener_to_source_base = rbinit(listener_to_source_base_find, NULL);
1000         if (listener_to_source_base == NULL) {
1001             pthread_mutex_unlock(&global_tree_mutex);
1002             return -1;
1003         }
1004     }
1005 
1006     rv = rbsearch(base, listener_to_source_base);
1007     pthread_mutex_unlock(&global_tree_mutex);
1008 
1009     if (base != rv) {
1010         if (NULL == rv) {
1011             CRITMSG("Out of memory");
1012         } else {
1013             CRITMSG("Duplicate listener created");
1014         }
1015         return -1;
1016     }
1017     return 0;
1018 }
1019 
1020 
1021 #if 0
1022 /*
1023  *    The following is #if 0'ed out because it fails to do what it
1024  *    is intended to do.
1025  *
1026  *    The issue appears to be that fixbuf and SiLK use different
1027  *    flags to getaddrinfo(), which changes the set of addresses
1028  *    that are returned.
1029  */
1030 /*
1031  *    fixbuf does not return an error when it cannot bind to any
1032  *    listening address, which means the application can start
1033  *    correctly but not be actively listening.  The following code
1034  *    attempts to detect this situation before creating the fixbuf
1035  *    listener by binding to the port.
1036  *
1037  *    Return 0 when able to successfully bind to the address or -1
1038  *    otherwise.
1039  */
1040 static int
1041 ipfixSourceBaseVerifyOpenPort(
1042     const sk_sockaddr_array_t  *listen_address)
1043 {
1044     const sk_sockaddr_t *addr;
1045     char addr_name[PATH_MAX];
1046     int *sock_array;
1047     int *s;
1048     uint16_t port = 0;
1049 
1050     s = sock_array = (int *)calloc(skSockaddrArrayGetSize(listen_address),
1051                                    sizeof(int));
1052     if (sock_array == NULL) {
1053         return -1;
1054     }
1055 
1056     DEBUGMSG(("Attempting to bind %" PRIu32 " addresses for %s"),
1057              skSockaddrArrayGetSize(listen_address),
1058              skSockaddrArrayGetHostPortPair(listen_address));
1059     for (i = 0; i < skSockaddrArrayGetSize(listen_address); ++i) {
1060         addr = skSockaddrArrayGet(listen_address, i);
1061         skSockaddrString(addr_name, sizeof(addr_name), addr);
1062 
1063         /* Get a socket */
1064         *s = socket(addr->sa.sa_family, SOCK_DGRAM, 0);
1065         if (-1 == *s) {
1066             DEBUGMSG("Skipping %s: Unable to create dgram socket: %s",
1067                      addr_name, strerror(errno));
1068             continue;
1069         }
1070         /* Bind socket to port */
1071         if (bind(*s, &addr->sa, skSockaddrLen(addr)) == -1) {
1072             DEBUGMSG("Skipping %s: Unable to bind: %s",
1073                      addr_name, strerror(errno));
1074             close(*s);
1075             *s = -1;
1076             continue;
1077         }
1078         DEBUGMSG("Bound %s for listening", addr_name);
1079         ++s;
1080         if (0 == port) {
1081             port = skSockaddrGetPort(addr);
1082         }
1083         assert(port == skSockaddrGetPort(addr));
1084     }
1085 
1086     if (s == sock_array) {
1087         ERRMSG("Failed to bind any addresses for %s",
1088                skSockaddrArrayGetHostPortPair(listen_address));
1089         free(sock_array);
1090         return -1;
1091     }
1092     DEBUGMSG(("Bound %" PRIu32 "/%" PRIu32 " addresses for %s"),
1093              (uint32_t)(s-sock_array),
1094              skSockaddrArrayGetSize(listen_address),
1095              skSockaddrArrayGetHostPortPair(listen_address));
1096     while (s != sock_array) {
1097         --s;
1098         close(*s);
1099     }
1100     free(sock_array);
1101     return 0;
1102 }
1103 #endif  /* 0 */
1104 
1105 
1106 /*
1107  *    Creates a IPFIX source listening on the network.
1108  *
1109  *    'probe' is the probe associated with the source.  'max_flows' is
1110  *    the number of IPFIX flows the created source can buffer in
1111  *    memory.
1112  *
1113  *    Returns a IPFIX source on success, or NULL on failure.
1114  */
1115 static skIPFIXSource_t *
ipfixSourceCreateFromSockaddr(const skpc_probe_t * probe,uint32_t max_flows)1116 ipfixSourceCreateFromSockaddr(
1117     const skpc_probe_t *probe,
1118     uint32_t            max_flows)
1119 {
1120     skIPFIXSource_t *source = NULL;
1121     skIPFIXSourceBase_t *localbase = NULL;
1122     skIPFIXSourceBase_t *base;
1123     const sk_sockaddr_array_t *listen_address;
1124     const sk_sockaddr_array_t **accept_from;
1125     peeraddr_source_t target;
1126     const peeraddr_source_t *found;
1127     skpc_proto_t protocol;
1128     GError *err = NULL;
1129     char port_string[7];
1130     uint32_t accept_from_count;
1131     uint32_t i;
1132     uint32_t j;
1133     int rv;
1134 
1135     TRACE_ENTRY;
1136 
1137     /* Check the protocol */
1138     protocol = skpcProbeGetProtocol(probe);
1139 
1140     /* Get the list of accept-from-host addresses. */
1141     accept_from_count = skpcProbeGetAcceptFromHost(probe, &accept_from);
1142 
1143     /* Get the listen address. */
1144     rv = skpcProbeGetListenOnSockaddr(probe, &listen_address);
1145     if (rv == -1) {
1146         goto ERROR;
1147     }
1148 
1149     /* Check to see if there is an existing base object for that
1150      * listen address */
1151     pthread_mutex_lock(&global_tree_mutex);
1152     if (!listener_to_source_base) {
1153         base = NULL;
1154     } else {
1155         /* Loop through all current bases, and compare based on
1156          * listen_address and protocol */
1157         fbTransport_t transport;
1158         RBLIST *iter;
1159 
1160         SILK_PROTO_TO_FIXBUF_TRANSPORT(protocol, &transport);
1161         iter = rbopenlist(listener_to_source_base);
1162         while ((base = (skIPFIXSourceBase_t *)rbreadlist(iter)) != NULL) {
1163             if (transport == base->connspec->transport
1164                 && skSockaddrArrayMatches(base->listen_address,
1165                                           listen_address, 0))
1166             {
1167                 /* Found a match.  'base' is now set to the matching
1168                  * base */
1169                 break;
1170             }
1171         }
1172         rbcloselist(iter);
1173     }
1174     pthread_mutex_unlock(&global_tree_mutex);
1175 
1176 #if 0
1177     if (NULL == base) {
1178         if (ipfixSourceBaseVerifyOpenPort(listen_address)) {
1179             goto ERROR;
1180         }
1181     }
1182 #endif  /* 0 */
1183 
1184     /* if there is an existing base on this listen-address, compare
1185      * its accept-from settings with those on this probe */
1186     if (base) {
1187         if (accept_from == NULL) {
1188             /* The new listener wants to be promiscuous but another
1189              * listener already exists. */
1190             goto ERROR;
1191         }
1192         pthread_mutex_lock(&base->mutex);
1193         if (base->any) {
1194             /* Already have a listener, and it is promiscuous. */
1195             pthread_mutex_unlock(&base->mutex);
1196             goto ERROR;
1197         }
1198         /* Ensure the accept-from addresses are unique. */
1199         for (j = 0; j < accept_from_count; ++j) {
1200             for (i = 0; i < skSockaddrArrayGetSize(accept_from[j]); ++i) {
1201                 target.addr = skSockaddrArrayGet(accept_from[j], i);
1202                 found = ((const peeraddr_source_t*)
1203                          rbfind(&target, base->addr_to_source));
1204                 if (found != NULL) {
1205                     pthread_mutex_unlock(&base->mutex);
1206                     goto ERROR;
1207                 }
1208             }
1209         }
1210         pthread_mutex_unlock(&base->mutex);
1211     }
1212 
1213     /* Create a new source object */
1214     source = (skIPFIXSource_t *)calloc(1, sizeof(*source));
1215     if (source == NULL) {
1216         goto ERROR;
1217     }
1218 
1219     /* Keep a handle to the probe and the probe's name */
1220     source->probe = probe;
1221     source->name = skpcProbeGetName(probe);
1222 
1223     if (PROBE_ENUM_NETFLOW_V9 == skpcProbeGetType(probe)
1224         || PROBE_ENUM_SFLOW == skpcProbeGetType(probe))
1225     {
1226         /* Create the look-up table for skIPFIXConnection_t's */
1227         source->connections = rbinit(pointer_cmp, NULL);
1228         if (NULL == source->connections) {
1229             goto ERROR;
1230         }
1231     }
1232 
1233     /* Create the circular buffer */
1234     if (skCircBufCreate(&source->circbuf, sizeof(rwRec), max_flows)) {
1235         goto ERROR;
1236     }
1237     /* Ready the first location in the circular buffer for writing */
1238     if (skCircBufGetWriterBlock(
1239             source->circbuf, &source->current_record, NULL))
1240     {
1241         skAbort();
1242     }
1243 
1244     pthread_mutex_init(&source->stats_mutex, NULL);
1245 
1246     if (base != NULL) {
1247         /* If there is an existing base, add the source to it. */
1248         if (ipfixSourceBaseAddIPFIXSource(base, source)) {
1249             goto ERROR;
1250         }
1251     } else {
1252         /* No existing base, create a new one */
1253 
1254         /* Create the base object */
1255         base = localbase = ipfixSourceCreateBase();
1256         if (base == NULL) {
1257             goto ERROR;
1258         }
1259         pthread_mutex_lock(&global_tree_mutex);
1260         ++source_base_count;
1261         pthread_mutex_unlock(&global_tree_mutex);
1262 
1263         /* Set the listen_address */
1264         base->listen_address = listen_address;
1265 
1266         /* Create a connspec in order to create a listener */
1267         base->connspec = (fbConnSpec_t *)calloc(1, sizeof(*base->connspec));
1268         if (base->connspec == NULL) {
1269             goto ERROR;
1270         }
1271         if (skSockaddrArrayGetHostname(listen_address)
1272             != sk_sockaddr_array_anyhostname)
1273         {
1274             base->connspec->host
1275                 = strdup(skSockaddrArrayGetHostname(listen_address));
1276             if (base->connspec->host == NULL) {
1277                 goto ERROR;
1278             }
1279         }
1280         rv = snprintf(port_string, sizeof(port_string), "%i",
1281                       skSockaddrGetPort(skSockaddrArrayGet(listen_address, 0)));
1282         assert((size_t)rv < sizeof(port_string));
1283         base->connspec->svc = strdup(port_string);
1284         if (base->connspec->svc == NULL) {
1285             goto ERROR;
1286         }
1287         SILK_PROTO_TO_FIXBUF_TRANSPORT(protocol, &base->connspec->transport);
1288 
1289         /* Create the listener */
1290         pthread_mutex_lock(&create_listener_mutex);
1291         base->listener = skiCreateListener(base, &err);
1292         if (NULL == base->listener) {
1293             pthread_mutex_unlock(&create_listener_mutex);
1294             goto ERROR;
1295         }
1296         if (SKPC_PROTO_UDP == protocol) {
1297             fbCollector_t *collector;
1298 
1299             if (!fbListenerGetCollector(base->listener, &collector, &err)) {
1300                 pthread_mutex_unlock(&create_listener_mutex);
1301                 goto ERROR;
1302             }
1303             /* Enable the multi-UDP support in libfixbuf. */
1304             fbCollectorSetUDPMultiSession(collector, 1);
1305 
1306 #if !FIXBUF_CHECK_VERSION(2,0,0)
1307             /* Treat UDP streams from the same address but different
1308              * ports as different streams, in accordance with the
1309              * IPFIX/NetFlow v9 RFCs. */
1310             fbCollectorManageUDPStreamByPort(collector, TRUE);
1311 #endif  /* FIXBUF_CHECK_VERSION */
1312 
1313             /* If this is a Netflow v9 source or an sFlow source, tell
1314              * the collector. */
1315             switch (skpcProbeGetType(source->probe)) {
1316               case PROBE_ENUM_IPFIX:
1317                 break;
1318               case PROBE_ENUM_NETFLOW_V9:
1319                 if (!fbCollectorSetNetflowV9Translator(collector, &err)) {
1320                     pthread_mutex_unlock(&create_listener_mutex);
1321                     goto ERROR;
1322                 }
1323                 break;
1324               case PROBE_ENUM_SFLOW:
1325                 if (!fbCollectorSetSFlowTranslator(collector, &err)) {
1326                     pthread_mutex_unlock(&create_listener_mutex);
1327                     goto ERROR;
1328                 }
1329                 break;
1330               default:
1331                 skAbortBadCase(skpcProbeGetType(source->probe));
1332             }
1333         }
1334         pthread_mutex_unlock(&create_listener_mutex);
1335 
1336         pthread_mutex_init(&base->mutex, NULL);
1337         pthread_cond_init(&base->cond, NULL);
1338 
1339         /* add the source to the base */
1340         if (ipfixSourceBaseAddIPFIXSource(base, source)) {
1341             goto ERROR;
1342         }
1343 
1344         /* Add base to list of bases, creating the list if needed */
1345         if (ipfixSourceBaseAddToGlobalList(base)) {
1346             goto ERROR;
1347         }
1348 
1349         /* Start the listener thread */
1350         pthread_mutex_lock(&base->mutex);
1351         rv = skthread_create(skSockaddrArrayGetHostPortPair(listen_address),
1352                              &base->thread, ipfix_reader, (void*)base);
1353         if (rv != 0) {
1354             pthread_mutex_unlock(&base->mutex);
1355             WARNINGMSG("Unable to spawn new thread for '%s': %s",
1356                        skSockaddrArrayGetHostPortPair(listen_address),
1357                        strerror(rv));
1358             goto ERROR;
1359         }
1360 
1361         /* Wait for the thread to really begin */
1362         do {
1363             pthread_cond_wait(&base->cond, &base->mutex);
1364         } while (!base->started);
1365         pthread_mutex_unlock(&base->mutex);
1366     }
1367 
1368     TRACE_RETURN(source);
1369 
1370   ERROR:
1371     if (err) {
1372         ERRMSG("'%s': %s", source->name, err->message);
1373     }
1374     g_clear_error(&err);
1375     if (localbase) {
1376         if (localbase->listener) {
1377             fbListenerFree(localbase->listener);
1378         }
1379         if (localbase->connspec) {
1380             free_connspec(localbase->connspec);
1381         }
1382         if (localbase->addr_to_source) {
1383             rbdestroy(localbase->addr_to_source);
1384         }
1385         free(localbase);
1386         pthread_mutex_lock(&global_tree_mutex);
1387         --source_base_count;
1388         if (0 == source_base_count) {
1389             skiInfoModelFree();
1390             if (listener_to_source_base) {
1391                 rbdestroy(listener_to_source_base);
1392                 listener_to_source_base = NULL;
1393             }
1394         }
1395         pthread_mutex_unlock(&global_tree_mutex);
1396     }
1397     if (source) {
1398         if (source->circbuf) {
1399             skCircBufDestroy(source->circbuf);
1400         }
1401         if (source->connections) {
1402             rbdestroy(source->connections);
1403         }
1404         free(source);
1405     }
1406     TRACE_RETURN(NULL);
1407 }
1408 
1409 
1410 /*
1411  *    Handler to print log messages.  This will be invoked by g_log()
1412  *    and the other logging functions from GLib2.
1413  */
1414 static void
ipfixGLogHandler(const gchar * log_domain,GLogLevelFlags log_level,const gchar * message,gpointer user_data)1415 ipfixGLogHandler(
1416     const gchar        *log_domain,
1417     GLogLevelFlags      log_level,
1418     const gchar        *message,
1419     gpointer            user_data)
1420 {
1421     /* In syslog, CRIT is worse than ERR; in Glib2 ERROR is worse than
1422      * CRITICAL. */
1423     SK_UNUSED_PARAM(log_domain);
1424     SK_UNUSED_PARAM(user_data);
1425 
1426     switch (log_level & G_LOG_LEVEL_MASK) {
1427       case G_LOG_LEVEL_CRITICAL:
1428         ERRMSG("%s", message);
1429         break;
1430       case G_LOG_LEVEL_WARNING:
1431         WARNINGMSG("%s", message);
1432         break;
1433       case G_LOG_LEVEL_MESSAGE:
1434         NOTICEMSG("%s", message);
1435         break;
1436       case G_LOG_LEVEL_INFO:
1437         INFOMSG("%s", message);
1438         break;
1439       case G_LOG_LEVEL_DEBUG:
1440         DEBUGMSG("%s", message);
1441         break;
1442       default:
1443         CRITMSG("%s", message);
1444         break;
1445     }
1446 }
1447 
1448 /*
1449  *    GLib Log handler to discard messages.
1450  */
1451 static void
ipfixGLogHandlerVoid(const gchar * log_domain,GLogLevelFlags log_level,const gchar * message,gpointer user_data)1452 ipfixGLogHandlerVoid(
1453     const gchar        *log_domain,
1454     GLogLevelFlags      log_level,
1455     const gchar        *message,
1456     gpointer            user_data)
1457 {
1458     SK_UNUSED_PARAM(*log_domain);
1459     SK_UNUSED_PARAM(log_level);
1460     SK_UNUSED_PARAM(*message);
1461     SK_UNUSED_PARAM(user_data);
1462     return;
1463 }
1464 
1465 
1466 /*
1467  *  ipfixSourceGlibInitialize();
1468  *
1469  *    Initialize the GLib slice allocator.  Since there is no way to
1470  *    de-initialize the slice allocator, valgrind will report this
1471  *    memory as "still-reachable".  We would rather have this
1472  *    "still-reachable" memory reported in a well-known location,
1473  *    instead of hidden somewhere within fixbuf.
1474  */
1475 static void
ipfixSourceGlibInitialize(void)1476 ipfixSourceGlibInitialize(
1477     void)
1478 {
1479 #if (GLIB_MAJOR_VERSION == 2 && GLIB_MINOR_VERSION >= 10)
1480 #define MEMORY_SIZE 128
1481     gpointer memory;
1482 
1483     memory = g_slice_alloc(MEMORY_SIZE);
1484     g_slice_free1(MEMORY_SIZE, memory);
1485 #endif
1486 }
1487 
1488 
1489 /*
1490  *    Performs any initialization required prior to creating the IPFIX
1491  *    sources.  Returns 0 on success, or -1 on failure.
1492  */
1493 int
skIPFIXSourcesSetup(void)1494 skIPFIXSourcesSetup(
1495     void)
1496 {
1497     const char *env;
1498     GLogLevelFlags log_levels = (GLogLevelFlags)(G_LOG_LEVEL_CRITICAL
1499                                                  | G_LOG_LEVEL_WARNING
1500                                                  | G_LOG_LEVEL_MESSAGE
1501                                                  | G_LOG_LEVEL_INFO
1502                                                  | G_LOG_LEVEL_DEBUG);
1503 
1504     /* initialize the slice allocator */
1505     ipfixSourceGlibInitialize();
1506 
1507     /* As of glib 2.32, g_thread_init() is deprecated. */
1508 #if (GLIB_MAJOR_VERSION == 2 && GLIB_MINOR_VERSION < 32)
1509     /* tell fixbuf (glib) we are a threaded program.  this will abort
1510      * if glib does not have thread support. */
1511     if (!g_thread_supported()) {
1512         g_thread_init(NULL);
1513     }
1514 #endif
1515 
1516     /* set a log handler for messages from glib, which we always want
1517      * to include in our log file.
1518      * http://developer.gnome.org/glib/stable/glib-Message-Logging.html */
1519     g_log_set_handler("GLib", log_levels, &ipfixGLogHandler, NULL);
1520 
1521     /* set a log handler for messages from fixbuf, maybe using a void
1522      * handler for warnings. */
1523     env = getenv(SK_ENV_FIXBUF_SUPPRESS_WARNING);
1524     if (env && *env && 0 == strcmp("1", env)) {
1525         /* suppress warnings by setting a void handler */
1526         log_levels = (GLogLevelFlags)((unsigned int)log_levels
1527                                       & ~(unsigned int)G_LOG_LEVEL_WARNING);
1528         g_log_set_handler(
1529             NULL, G_LOG_LEVEL_WARNING, &ipfixGLogHandlerVoid, NULL);
1530     }
1531     g_log_set_handler(NULL, log_levels, &ipfixGLogHandler, NULL);
1532 
1533     /* Determine which information elements should be used when
1534      * defining the NetFlow v9 Sampling template. */
1535     ski_nf9sampling_check_spec();
1536 
1537     return 0;
1538 }
1539 
1540 
1541 /*
1542  *    Free any state allocated by skIPFIXSourcesSetup().
1543  */
1544 void
skIPFIXSourcesTeardown(void)1545 skIPFIXSourcesTeardown(
1546     void)
1547 {
1548     skiTeardown();
1549 }
1550 
1551 
1552 /*
1553  *    Creates a IPFIX source based on an skpc_probe_t.
1554  *
1555  *    If the source is a network-based probe, this function also
1556  *    starts the collection process.
1557  *
1558  *    When creating a source from a network-based probe, the 'params'
1559  *    union should have the 'max_pkts' member specify the maximum
1560  *    number of packets to buffer in memory for this source.
1561  *
1562  *    When creating a source from a probe that specifies either a file
1563  *    or a directory that is polled for files, the 'params' union must
1564  *    have the 'path_name' specify the full path of the file to
1565  *    process.
1566  *
1567  *    Return the new source, or NULL on error.
1568  */
1569 skIPFIXSource_t *
skIPFIXSourceCreate(const skpc_probe_t * probe,const skFlowSourceParams_t * params)1570 skIPFIXSourceCreate(
1571     const skpc_probe_t         *probe,
1572     const skFlowSourceParams_t *params)
1573 {
1574     skIPFIXSource_t *source;
1575 
1576     TRACE_ENTRY;
1577 
1578     /* Check whether this is a file-based probe---either handles a
1579      * single file or files pulled from a directory poll */
1580     if (NULL != skpcProbeGetPollDirectory(probe)
1581         || NULL != skpcProbeGetFileSource(probe))
1582     {
1583         if (NULL == params->path_name) {
1584             TRACE_RETURN(NULL);
1585         }
1586         source = ipfixSourceCreateFromFile(probe, params->path_name);
1587 
1588     } else {
1589         /* must be a network-based source */
1590         source = ipfixSourceCreateFromSockaddr(probe, params->max_pkts);
1591     }
1592 
1593     TRACE_RETURN(source);
1594 }
1595 
1596 
1597 /*
1598  *    Stops processing of packets.  This will cause a call to any
1599  *    skIPFIXSourceGetGeneric() function to stop blocking.  Meant to
1600  *    be used as a prelude to skIPFIXSourceDestroy() in threaded code.
1601  */
1602 void
skIPFIXSourceStop(skIPFIXSource_t * source)1603 skIPFIXSourceStop(
1604     skIPFIXSource_t    *source)
1605 {
1606     TRACE_ENTRY;
1607 
1608     assert(source);
1609 
1610     /* Mark the source as stopped, and unblock the circular buffer */
1611     source->stopped = 1;
1612     if (source->circbuf) {
1613         skCircBufStop(source->circbuf);
1614     }
1615     TRACE_RETURN;
1616 }
1617 
1618 
1619 /*
1620  *    Destroys an IPFIX source.
1621  */
1622 void
skIPFIXSourceDestroy(skIPFIXSource_t * source)1623 skIPFIXSourceDestroy(
1624     skIPFIXSource_t    *source)
1625 {
1626     skIPFIXSourceBase_t *base;
1627     const sk_sockaddr_array_t **accept_from;
1628     peeraddr_source_t target;
1629     const peeraddr_source_t *found;
1630     uint32_t accept_from_count;
1631     uint32_t i;
1632     uint32_t j;
1633 
1634     TRACE_ENTRY;
1635 
1636     if (!source) {
1637         TRACE_RETURN;
1638     }
1639 
1640     accept_from_count = skpcProbeGetAcceptFromHost(source->probe,&accept_from);
1641 
1642     assert(source->base);
1643 
1644     base = source->base;
1645 
1646     pthread_mutex_lock(&base->mutex);
1647 
1648     /* Remove the source from the red-black tree */
1649     if (base->addr_to_source && accept_from) {
1650         /* Remove the source's accept-from-host addresses from
1651          * base->addr_to_source */
1652         for (j = 0; j < accept_from_count; ++j) {
1653             for (i = 0; i < skSockaddrArrayGetSize(accept_from[j]); ++i) {
1654                 target.addr = skSockaddrArrayGet(accept_from[j], i);
1655                 found = ((const peeraddr_source_t*)
1656                          rbdelete(&target, base->addr_to_source));
1657                 if (found && (found->source == source)) {
1658                     free((void*)found);
1659                 }
1660             }
1661         }
1662     }
1663 
1664     /* Stop the source */
1665     skIPFIXSourceStop(source);
1666 
1667     /* If the source is not currently being referenced by an fBuf_t,
1668      * free it, otherwise mark it to be destroyed when the fBuf_t is
1669      * freed by fixbufDisconnect(). */
1670     if (source->connection_count == 0) {
1671         free_source(source);
1672     } else {
1673         source->destroy = 1;
1674     }
1675 
1676     /* Decrement the source reference count */
1677     assert(base->source_count);
1678     --base->source_count;
1679 
1680     TRACEMSG(3, ("base %p source_count is %u", base, base->source_count));
1681 
1682     /* If this base object is still referenced by sources, return */
1683     if (base->source_count != 0) {
1684         pthread_mutex_unlock(&base->mutex);
1685         TRACE_RETURN;
1686     }
1687 
1688     /* Otherwise, we must destroy the base stop its thread */
1689     base->destroyed = 1;
1690 
1691     if (base->listener) {
1692         TRACEMSG(3, ("base %p calling fbListenerInterrupt", base));
1693 
1694         /* Unblock the fbListenerWait() call */
1695         fbListenerInterrupt(base->listener);
1696 
1697         /* Signal that the thread is to exit */
1698         pthread_cond_broadcast(&base->cond);
1699 
1700         TRACEMSG(3, ("base %p waiting for running variable", base));
1701 
1702         /* Wait for the thread to exit */
1703         while (base->running) {
1704             pthread_cond_wait(&base->cond, &base->mutex);
1705         }
1706 
1707         TRACEMSG(3, ("base %p joining its thread", base));
1708 
1709         /* Acknowledge that the thread has exited */
1710         pthread_join(base->thread, NULL);
1711 
1712         assert(base->listener == NULL);
1713 
1714         /* Free the connspec */
1715         free_connspec(base->connspec);
1716 
1717         /* Destroy the red-black tree */
1718         if (base->addr_to_source) {
1719             rbdestroy(base->addr_to_source);
1720         }
1721 
1722         pthread_cond_destroy(&base->cond);
1723 
1724         pthread_mutex_unlock(&base->mutex);
1725         pthread_mutex_destroy(&base->mutex);
1726     }
1727 
1728     TRACEMSG(3, ("base %p is free", base));
1729 
1730     free(base);
1731 
1732     pthread_mutex_lock(&global_tree_mutex);
1733     --source_base_count;
1734     if (0 == source_base_count) {
1735         /* When the last base is removed, destroy the global base
1736          * list, and call the teardown function for the libskipfix
1737          * library to free any global objects allocated there. */
1738         if (listener_to_source_base) {
1739             rbdestroy(listener_to_source_base);
1740             listener_to_source_base = NULL;
1741         }
1742         skiTeardown();
1743     }
1744     pthread_mutex_unlock(&global_tree_mutex);
1745     TRACE_RETURN;
1746 }
1747 
1748 
1749 
1750 
1751 /*
1752  *    Requests a SiLK Flow record from the IPFIX source 'source'.
1753  *
1754  *    This function will block if there are no IPFIX flows available
1755  *    from which to create a SiLK Flow record.
1756  *
1757  *    Returns 0 on success, -1 on failure.
1758  */
1759 int
skIPFIXSourceGetGeneric(skIPFIXSource_t * source,rwRec * rwrec)1760 skIPFIXSourceGetGeneric(
1761     skIPFIXSource_t    *source,
1762     rwRec              *rwrec)
1763 {
1764     rwRec *rec;
1765     int rv;
1766 
1767     TRACE_ENTRY;
1768 
1769     assert(source);
1770     assert(rwrec);
1771 
1772     if (source->circbuf) {
1773         /* Reading from the circular buffer */
1774         if (skCircBufGetReaderBlock(source->circbuf, &rec, NULL)) {
1775             TRACE_RETURN(-1);
1776         }
1777         RWREC_COPY(rwrec, rec);
1778         TRACE_RETURN(0);
1779     }
1780 
1781     rv = ipfixSourceGetRecordFromFile(source, rwrec);
1782     TRACE_RETURN(rv);
1783 }
1784 
1785 
1786 
1787 /* Log statistics associated with a IPFIX source, and then clear the
1788  * statistics. */
1789 void
skIPFIXSourceLogStatsAndClear(skIPFIXSource_t * source)1790 skIPFIXSourceLogStatsAndClear(
1791     skIPFIXSource_t    *source)
1792 {
1793     TRACE_ENTRY;
1794 
1795     pthread_mutex_lock(&source->stats_mutex);
1796 
1797     /* print log message giving the current statistics on the
1798      * skIPFIXSource_t pointer 'source' */
1799     {
1800         fbCollector_t *collector = NULL;
1801         GError *err = NULL;
1802 
1803         if (source->saw_yafstats_pkt) {
1804             /* IPFIX from yaf: print the stats */
1805 
1806             INFOMSG(("'%s': forward %" PRIu64
1807                      ", reverse %" PRIu64
1808                      ", ignored %" PRIu64
1809                      "; yaf: recs %" PRIu64
1810                      ", pkts %" PRIu64
1811                      ", dropped-pkts %" PRIu64
1812                      ", ignored-pkts %" PRIu64
1813                      ", bad-sequence-pkts %" PRIu64
1814                      ", expired-frags %" PRIu64),
1815                     source->name,
1816                     source->forward_flows,
1817                     source->reverse_flows,
1818                     source->ignored_flows,
1819                     source->yaf_exported_flows,
1820                     source->yaf_processed_packets,
1821                     source->yaf_dropped_packets,
1822                     source->yaf_ignored_packets,
1823                     source->yaf_notsent_packets,
1824                     source->yaf_expired_fragments);
1825 
1826         } else if (!source->connections
1827                    || !source->base
1828                    || !source->base->listener)
1829         {
1830             /* no data or other IPFIX; print count of SiLK flows
1831              * created */
1832 
1833             INFOMSG(("'%s': forward %" PRIu64
1834                      ", reverse %" PRIu64
1835                      ", ignored %" PRIu64),
1836                     source->name,
1837                     source->forward_flows,
1838                     source->reverse_flows,
1839                     source->ignored_flows);
1840 
1841         } else if (!fbListenerGetCollector(source->base->listener,
1842                                            &collector, &err))
1843         {
1844             /* sFlow or NetFlowV9, but no collector */
1845 
1846             DEBUGMSG("'%s': Unable to get collector for source: %s",
1847                      source->name, err->message);
1848             g_clear_error(&err);
1849 
1850             INFOMSG(("'%s': forward %" PRIu64
1851                      ", reverse %" PRIu64
1852                      ", ignored %" PRIu64),
1853                     source->name,
1854                     source->forward_flows,
1855                     source->reverse_flows,
1856                     source->ignored_flows);
1857 
1858         } else {
1859             /* sFlow or NetFlowV9 */
1860             skIPFIXConnection_t *conn;
1861             RBLIST *iter;
1862             uint64_t prev;
1863 
1864             iter = rbopenlist(source->connections);
1865             while ((conn = (skIPFIXConnection_t *)rbreadlist(iter)) != NULL) {
1866                 /* store the previous number of dropped NF9/sFlow packets
1867                  * and get the new number of dropped packets. */
1868                 prev = conn->prev_yafstats.droppedPacketTotalCount;
1869                 if (skpcProbeGetType(source->probe) == PROBE_ENUM_SFLOW) {
1870                     conn->prev_yafstats.droppedPacketTotalCount
1871                         = fbCollectorGetSFlowMissed(
1872                             collector, &conn->peer_addr.sa, conn->peer_len,
1873                             conn->ob_domain);
1874                 } else {
1875                     conn->prev_yafstats.droppedPacketTotalCount
1876                         = fbCollectorGetNetflowMissed(
1877                             collector, &conn->peer_addr.sa, conn->peer_len,
1878                             conn->ob_domain);
1879                 }
1880                 if (prev > conn->prev_yafstats.droppedPacketTotalCount) {
1881                     /* assume a new collector */
1882                     TRACEMSG(4, (("Assuming new collector: NF9 loss dropped"
1883                                   " old = %" PRIu64 ", new = %" PRIu64),
1884                                  prev,
1885                                  conn->prev_yafstats.droppedPacketTotalCount));
1886                     prev = 0;
1887                 }
1888                 source->yaf_dropped_packets
1889                     += conn->prev_yafstats.droppedPacketTotalCount - prev;
1890             }
1891             rbcloselist(iter);
1892 
1893             INFOMSG(("'%s': forward %" PRIu64
1894                      ", reverse %" PRIu64
1895                      ", ignored %" PRIu64
1896                      ", %s: missing-pkts %" PRIu64),
1897                     source->name,
1898                     source->forward_flows,
1899                     source->reverse_flows,
1900                     source->ignored_flows,
1901                     ((skpcProbeGetType(source->probe) == PROBE_ENUM_SFLOW)
1902                      ? "sflow" : "nf9"),
1903                     source->yaf_dropped_packets);
1904         }
1905     }
1906 
1907 #if SOURCE_LOG_MAX_PENDING_WRITE
1908     if (skpcProbeGetLogFlags(source->probe) & SOURCE_LOG_MAX_PENDING_WRITE) {
1909         INFOMSG(("'%s': Maximum number of read records waiting to be written:"
1910                  " %" PRIu32), source->name, source->max_pending);
1911     }
1912 #endif
1913 
1914     /* reset (set to zero) statistics on the skIPFIXSource_t
1915      * 'source' */
1916     {
1917         source->yaf_dropped_packets = 0;
1918         source->yaf_ignored_packets = 0;
1919         source->yaf_notsent_packets = 0;
1920         source->yaf_expired_fragments = 0;
1921         source->yaf_processed_packets = 0;
1922         source->yaf_exported_flows = 0;
1923         source->forward_flows = 0;
1924         source->reverse_flows = 0;
1925         source->ignored_flows = 0;
1926         source->max_pending = 0;
1927     }
1928 
1929     pthread_mutex_unlock(&source->stats_mutex);
1930     TRACE_RETURN;
1931 }
1932 
1933 
1934 /*
1935 ** Local Variables:
1936 ** mode:c
1937 ** indent-tabs-mode:nil
1938 ** c-basic-offset:4
1939 ** End:
1940 */
1941