1 /** @internal
2  **
3  **
4  ** @file fbcollector.c
5  ** IPFIX Collecting Process single transport session implementation
6  **
7  ** ------------------------------------------------------------------------
8  ** Copyright (C) 2006-2019 Carnegie Mellon University. All Rights Reserved.
9  ** ------------------------------------------------------------------------
10  ** Authors: Brian Trammell
11  ** ------------------------------------------------------------------------
12  ** @OPENSOURCE_LICENSE_START@
13  ** libfixbuf 2.0
14  **
15  ** Copyright 2018-2019 Carnegie Mellon University. All Rights Reserved.
16  **
17  ** NO WARRANTY. THIS CARNEGIE MELLON UNIVERSITY AND SOFTWARE
18  ** ENGINEERING INSTITUTE MATERIAL IS FURNISHED ON AN "AS-IS"
19  ** BASIS. CARNEGIE MELLON UNIVERSITY MAKES NO WARRANTIES OF ANY KIND,
20  ** EITHER EXPRESSED OR IMPLIED, AS TO ANY MATTER INCLUDING, BUT NOT
21  ** LIMITED TO, WARRANTY OF FITNESS FOR PURPOSE OR MERCHANTABILITY,
22  ** EXCLUSIVITY, OR RESULTS OBTAINED FROM USE OF THE
23  ** MATERIAL. CARNEGIE MELLON UNIVERSITY DOES NOT MAKE ANY WARRANTY OF
24  ** ANY KIND WITH RESPECT TO FREEDOM FROM PATENT, TRADEMARK, OR
25  ** COPYRIGHT INFRINGEMENT.
26  **
27  ** Released under a GNU-Lesser GPL 3.0-style license, please see
28  ** LICENSE.txt or contact permission@sei.cmu.edu for full terms.
29  **
30  ** [DISTRIBUTION STATEMENT A] This material has been approved for
31  ** public release and unlimited distribution.  Please see Copyright
32  ** notice for non-US Government use and distribution.
33  **
34  ** Carnegie Mellon(R) and CERT(R) are registered in the U.S. Patent
35  ** and Trademark Office by Carnegie Mellon University.
36  **
37  ** DM18-0325
38  ** @OPENSOURCE_LICENSE_END@
39  ** ------------------------------------------------------------------------
40  */
41 
42 /*#define _GNU_SOURCE*/
43 #define _FIXBUF_SOURCE_
44 #include <fixbuf/private.h>
45 
46 #include "fbcollector.h"
47 
48 
49 /*#################################################
50  *
51  * IPFIX functions for reading input, these are
52  * the default functions
53  *
54  *#################################################*/
55 
56 
57 /**
58  * fbCollectorDecodeMsgVL
59  *
60  * decodes the header of a variable length message to determine
61  * how long the message is in order to read the appropriate
62  * amount to complete the message
63  *
64  *
65  * @return FALSE on error, TRUE on success
66  */
fbCollectorDecodeMsgVL(fbCollector_t * collector,fbCollectorMsgVL_t * hdr,size_t b_len,uint16_t * m_len,GError ** err)67 static gboolean fbCollectorDecodeMsgVL(
68     fbCollector_t               *collector,
69     fbCollectorMsgVL_t          *hdr,
70     size_t                      b_len,
71     uint16_t                    *m_len,
72     GError                      **err)
73 {
74     uint16_t                    h_version;
75     uint16_t                    h_len;
76 
77     /* collector is unused in this function*/
78     (void)collector;
79 
80     h_version = g_ntohs(hdr->n_version);
81     if (h_version != 0x000A) {
82         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IPFIX,
83                     "Illegal IPFIX Message version 0x%04x; "
84                     "input is probably not an IPFIX Message stream.",
85                     g_ntohs(hdr->n_version));
86         *m_len = 0;
87         return FALSE;
88     }
89 
90     h_len = g_ntohs(hdr->n_len);
91     if (h_len < 16) {
92         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IPFIX,
93                     "Illegal IPFIX Message length 0x%04x; "
94                     "input is probably not an IPFIX Message stream.",
95                     g_ntohs(hdr->n_len));
96         *m_len = 0;
97         return FALSE;
98     }
99 
100     if (b_len && (h_len > b_len)) {
101         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_BUFSZ,
102                     "Buffer too small to read IPFIX Message "
103                     "(message size %hu, buffer size %u).",
104                     h_len, (uint32_t)b_len);
105         *m_len = 0;
106         return FALSE;
107     }
108 
109     *m_len = h_len;
110     return TRUE;
111 }
112 
113 /*
114  * fbCollectMessageBuffer
115  *
116  * decodes the header of a variable length message to determine
117  * how long the message is.
118  * this is used for applications that wants to handle the
119  * connection (reading, shutdown, etc.) itself.
120  * An error FB_ERROR_BUFSZ means that the the buffer does not
121  * contain a full message and an additional read should
122  * occur.
123  *
124  * @return FALSE on error, TRUE on success
125  */
126 
fbCollectMessageBuffer(uint8_t * hdr,size_t b_len,size_t * m_len,GError ** err)127 gboolean fbCollectMessageBuffer(
128     uint8_t                     *hdr,
129     size_t                      b_len,
130     size_t                      *m_len,
131     GError                      **err)
132 {
133     fbCollectorMsgVL_t          *iphdr = (fbCollectorMsgVL_t *)hdr;
134     uint16_t                    h_version;
135     uint16_t                    h_len;
136 
137     if (!hdr || (b_len < 16)) {
138         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_BUFSZ,
139                     "Buffer length too small to contain IPFIX header"
140                     "(buffer size %u).", (uint32_t)b_len);
141         *m_len = 0;
142         return FALSE;
143     }
144 
145     h_version = g_ntohs(iphdr->n_version);
146     if (h_version != 0x000A) {
147         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IPFIX,
148                     "Illegal IPFIX Message version 0x%04x; "
149                     "input is probably not an IPFIX Message stream.",
150                     g_ntohs(iphdr->n_version));
151         *m_len = 0;
152         return FALSE;
153     }
154 
155     h_len = g_ntohs(iphdr->n_len);
156     if (h_len < 16) {
157         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IPFIX,
158                     "Illegal IPFIX Message length 0x%04x; "
159                     "input is probably not an IPFIX Message stream.",
160                     g_ntohs(iphdr->n_len));
161         *m_len = 0;
162         return FALSE;
163     }
164 
165     if (h_len > b_len) {
166         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_BUFSZ,
167                     "Buffer too small to contain IPFIX Message "
168                     "(message size %hu, buffer size %u).",
169                     h_len, (uint32_t)b_len);
170         *m_len = 0;
171         return FALSE;
172     }
173 
174     *m_len = h_len;
175     return TRUE;
176 }
177 
178 
179 /**
180  * fbCollectorMessageHeaderNull
181  *
182  * this is used to process a PDU after it has been read in a message
183  * based transport protocol (UDP, SCTP) to adjust the header if
184  * needed before sending it to post read fixing.  This version does
185  * nothing. NULL transform.
186  *
187  * @param collector pointer to the collector state structure
188  * @param buffer pointer to the message buffer
189  * @param b_len length of the buffer passed in
190  * @param m_len pointer to the length of the resultant buffer
191  * @param err pointer to a GLib error structure
192  *
193  * @return TRUE (this always works)
194  *
195  */
fbCollectorMessageHeaderNull(fbCollector_t * collector,uint8_t * buffer,size_t b_len,uint16_t * m_len,GError ** err)196 static gboolean    fbCollectorMessageHeaderNull(
197     fbCollector_t               *collector __attribute__((unused)),
198     uint8_t                     *buffer __attribute__((unused)),
199     size_t                      b_len,
200     uint16_t                    *m_len,
201     GError                      **err __attribute__((unused)) )
202 {
203     *m_len = b_len;
204     return TRUE;
205 }
206 
207 /**
208  * fbCollectorUDPMessageHeader
209  *
210  * this is used to process a PDU after it has been read in a message
211  * based transport protocol (UDP, SCTP) to adjust the header if
212  * needed before sending it to post read fixing.  This version does
213  * nothing. NULL transform.
214  *
215  * @param collector pointer to the collector state structure
216  * @param buffer pointer to the message buffer
217  * @param b_len length of the buffer passed in
218  * @param m_len pointer to the length of the resultant buffer
219  * @param err pointer to a GLib error structure
220  *
221  * @return TRUE (this always works)
222  *
223  */
fbCollectorUDPMessageHeader(fbCollector_t * collector,uint8_t * buffer,size_t b_len,uint16_t * m_len,GError ** err)224 static gboolean    fbCollectorUDPMessageHeader(
225     fbCollector_t               *collector,
226     uint8_t                     *buffer,
227     size_t                      b_len,
228     uint16_t                    *m_len,
229     GError                      **err)
230 {
231     uint16_t                   h_version;
232 
233     *m_len = b_len;
234 
235     if (b_len > 16) {
236         if (!fbCollectorHasTranslator(collector)) {
237             h_version = g_ntohs(*(uint16_t *)buffer);
238             if (h_version != 0x000A) {
239                 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IPFIX,
240                             "Illegal IPFIX Message Version 0x%04x",
241                             h_version);
242                 return FALSE;
243             }
244         }
245         collector->obdomain = g_ntohl(*(uint32_t *)(buffer + 12));
246         /* Update collector time */
247         collector->time = time(NULL);
248     }
249 
250     return TRUE;
251 }
252 
253 
254 /**
255  * fbCollectorPostProcNull
256  *
257  * this is used to process a PDU after it has been read in order to transform
258  * it, except that this function does _no_ transforms
259  *
260  * @param collector _not used_
261  *
262  * @return TRUE (always succesfull)
263  *
264  */
fbCollectorPostProcNull(fbCollector_t * collector,uint8_t * dataBuf,size_t * bufLen,GError ** err)265 static gboolean     fbCollectorPostProcNull(
266     fbCollector_t   *collector,
267     uint8_t         *dataBuf,
268     size_t          *bufLen,
269     GError          **err)
270 {
271     (void)collector;
272     (void)dataBuf;
273     (void)bufLen;
274     (void)err;
275 
276     return TRUE;
277 }
278 
279 /**
280  * fbCollectorCloseTranslatorNull
281  *
282  * default function to clean up the translator state, but there is
283  * none, and this function does nothing
284  *
285  * @param collector current collector
286  *
287  */
fbCollectorCloseTranslatorNull(fbCollector_t * collector)288 static void         fbCollectorCloseTranslatorNull(
289     fbCollector_t   *collector)
290 {
291     (void)collector;
292     return;
293 }
294 
295 /**
296  * fbCollectorSessionTimeoutNull
297  *
298  * default function to clean up timed out UDP sessions.
299  * this function does nothing
300  *
301  * @param collector current collector
302  * @param session session that will be timed out
303  *
304  */
fbCollectorSessionTimeoutNull(fbCollector_t * collector,fbSession_t * session)305 static void        fbCollectorSessionTimeoutNull(
306     fbCollector_t     *collector,
307     fbSession_t       *session)
308 {
309     (void)collector;
310     (void)session;
311     return;
312 }
313 
314 
315 /*#################################################
316  *
317  * the rest of the meat of the collector implementation
318  *
319  *#################################################*/
320 
321 /**
322  * fbCollectorReadFile
323  *
324  *
325  *
326  */
fbCollectorReadFile(fbCollector_t * collector,uint8_t * msgbase,size_t * msglen,GError ** err)327 static gboolean fbCollectorReadFile(
328     fbCollector_t           *collector,
329     uint8_t                 *msgbase,
330     size_t                  *msglen,
331     GError                  **err)
332 {
333     int                     rc;
334     uint16_t                h_len;
335     gboolean                goodLen;
336 
337     /* Read and decode version and length */
338     g_assert(*msglen > 4);
339 
340     rc = fread(msgbase, 1, 4, collector->stream.fp);
341     if (rc > 0) {
342         goodLen = collector->coreadLen(collector,(fbCollectorMsgVL_t *)msgbase,
343                                        *msglen, &h_len, err);
344         if (FALSE == goodLen) return FALSE;
345         msgbase += 4;
346     } else if (feof(collector->stream.fp)) {
347         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_EOF,
348                     "End of file");
349         return FALSE;
350     } else {
351         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
352                     "I/O error: %s", strerror(errno));
353         return FALSE;
354     }
355 
356     /* read rest of message */
357     rc = fread(msgbase, 1, h_len - 4, collector->stream.fp);
358     if (rc > 0) {
359         *msglen = rc + 4;
360         if (!collector->copostRead(collector, msgbase, msglen, err)) {
361             return FALSE;
362         }
363         return TRUE;
364     } else if (feof(collector->stream.fp)) {
365         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_EOF,
366                     "End of file");
367         return FALSE;
368     } else {
369         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
370                     "I/O error: %s", strerror(errno));
371         return FALSE;
372     }
373 }
374 
375 /**
376  * fbCollectorCloseFile
377  *
378  *
379  *
380  */
fbCollectorCloseFile(fbCollector_t * collector)381 static void fbCollectorCloseFile(
382     fbCollector_t   *collector)
383 {
384     if (collector->stream.fp != stdin) {
385         fclose(collector->stream.fp);
386     }
387     collector->active = FALSE;
388 }
389 
390 /**
391  * fbCollectorAllocFP
392  *
393  *
394  *
395  */
fbCollectorAllocFP(void * ctx,FILE * fp)396 fbCollector_t *fbCollectorAllocFP(
397     void            *ctx,
398     FILE            *fp)
399 {
400     fbCollector_t   *collector = NULL;
401 
402     g_assert(fp);
403 
404     /* Create a new collector */
405     collector = g_slice_new0(fbCollector_t);
406 
407     /* Fill the collector in */
408     collector->ctx = ctx;
409     collector->stream.fp = fp;
410     collector->bufferedStream = TRUE;
411     collector->active = TRUE;
412     collector->coread = fbCollectorReadFile;
413     collector->copostRead = fbCollectorPostProcNull;
414     collector->coreadLen = fbCollectorDecodeMsgVL;
415     collector->comsgHeader = fbCollectorMessageHeaderNull;
416     collector->cotransClose = fbCollectorCloseTranslatorNull;
417     collector->cotimeOut = fbCollectorSessionTimeoutNull;
418     collector->translationActive = FALSE;
419     collector->rip = -1;
420     collector->wip = -1;
421 
422     /* All done */
423     return collector;
424 }
425 
426 /**
427  * fbCollectorAllocFile
428  *
429  *
430  *
431  */
fbCollectorAllocFile(void * ctx,const char * path,GError ** err)432 fbCollector_t *fbCollectorAllocFile(
433     void            *ctx,
434     const char      *path,
435     GError          **err)
436 {
437     fbCollector_t   *collector = NULL;
438     FILE            *fp = NULL;
439 
440     /* check to see if we're opening stdin */
441     if ((strlen(path) == 1) && (path[0] == '-'))
442     {
443         /* don't open a terminal */
444         if (isatty(fileno(stdin))) {
445             g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
446                         "Refusing to open stdin terminal for collection");
447             return NULL;
448         }
449 
450         /* yep, stdin */
451         fp = stdin;
452     } else {
453         /* nope, just a regular file; open it. */
454         fp = fopen(path, "r");
455     }
456 
457     /* check for error */
458     if (!fp) {
459         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
460                     "Couldn't open %s for collection: %s",
461                     path, strerror(errno));
462         return NULL;
463     }
464 
465     /* allocate a collector */
466     collector = fbCollectorAllocFP(ctx, fp);
467 
468     /* set the file close function */
469     collector->coclose = fbCollectorCloseFile;
470 
471     /* set the default collector function */
472     collector->copostRead = fbCollectorPostProcNull;
473 
474     /* default translator cleanup function */
475     collector->cotransClose = fbCollectorCloseTranslatorNull;
476 
477     /* set the default message read length function */
478     collector->coreadLen = fbCollectorDecodeMsgVL;
479 
480     /* set the default message header transform function */
481     collector->comsgHeader = fbCollectorMessageHeaderNull;
482 
483     /* set the default session timed out function - won't get called */
484     collector->cotimeOut = fbCollectorSessionTimeoutNull;
485 
486     /* mark the stream if it is a buffered file pointer */
487     collector->bufferedStream = TRUE;
488 
489     /* set that a input translator is not in use */
490     collector->translationActive = FALSE;
491 
492     /* since we're not a listener */
493     collector->rip = -1;
494     collector->wip = -1;
495 
496     /* all done */
497     return collector;
498 }
499 
500 #if FB_ENABLE_SCTP
501 
502 /**
503  * fbCollectorReadSCTP
504  *
505  *
506  *
507  */
fbCollectorReadSCTP(fbCollector_t * collector,uint8_t * msgbase,size_t * msglen,GError ** err)508 static gboolean fbCollectorReadSCTP(
509     fbCollector_t   *collector,
510     uint8_t         *msgbase,
511     size_t          *msglen,
512     GError          **err)
513 {
514     uint16_t                msgSize;
515     struct sockaddr         peer;
516     socklen_t               peerlen = sizeof(peer);
517     struct sctp_sndrcvinfo  sri;
518     int                     sctp_flags = 0;
519     int                     rc;
520 
521     rc = sctp_recvmsg(collector->stream.fd, msgbase, *msglen,
522                       &peer, &peerlen, &sri, &sctp_flags);
523 
524     if (rc > 0) {
525         if (!collector->comsgHeader(collector, msgbase, rc, &msgSize, err)) {
526             return FALSE;
527         }
528         *msglen = msgSize;
529         if (!collector->copostRead(collector, msgbase, msglen, err)) {
530             return FALSE;
531         }
532         return TRUE;
533     } else if (rc == 0) {
534         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_EOF,
535                     "End of file");
536         return FALSE;
537     } else if (errno == EINTR) {
538         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD,
539                     "SCTP read interrupt");
540         return FALSE;
541     } else {
542         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
543                     "TCP I/O error: %s", strerror(errno));
544         return FALSE;
545     }
546 }
547 #endif /* FB_ENABLE_SCTP */
548 
fbCollectorHandleSelect(fbCollector_t * collector)549 static int fbCollectorHandleSelect(
550     fbCollector_t   *collector)
551 {
552     fd_set  rdfds;
553     int     maxfd;
554     int     count;
555     int     retVal = 0;
556     uint8_t byte;
557 
558     g_assert(collector);
559 
560     if (collector->rip > collector->stream.fd) {
561         maxfd = collector->rip;
562     } else {
563         maxfd = collector->stream.fd;
564     }
565 
566     maxfd++;
567 
568     FD_ZERO(&rdfds);
569     FD_SET(collector->rip, &rdfds);
570     FD_SET(collector->stream.fd, &rdfds);
571 
572     count = select(maxfd, &rdfds, NULL, NULL, NULL);
573 
574     if (count) {
575         if (FD_ISSET(collector->stream.fd, &rdfds)) {
576             retVal = 0;
577         }
578 
579         if (FD_ISSET(collector->rip, &rdfds)) {
580             read(collector->rip, &byte, sizeof(byte));
581             return -1;
582         }
583         return retVal;
584     } else {
585         return -1;
586     }
587 }
588 
589 /**
590  * fbCollectorReadTCP
591  *
592  *
593  *
594  */
fbCollectorReadTCP(fbCollector_t * collector,uint8_t * msgbase,size_t * msglen,GError ** err)595 static gboolean fbCollectorReadTCP(
596     fbCollector_t   *collector,
597     uint8_t         *msgbase,
598     size_t          *msglen,
599     GError          **err)
600 {
601     int                     rc;
602     uint16_t                h_len, rrem;
603     gboolean                goodLen;
604 
605     /* Read and decode version and length */
606     g_assert(*msglen > 4);
607     rrem = 4;
608     while (rrem) {
609         rc = fbCollectorHandleSelect(collector);
610 
611         if (rc < 0) {
612             g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
613                         "Interrupted by pipe");
614             /* interrupted by pipe read or other error with select*/
615             return FALSE;
616         }
617 
618         rc = read(collector->stream.fd, msgbase, rrem);
619         if (rc > 0) {
620             rrem -= rc;
621             msgbase += rc;
622         } else if (rc == 0) {
623             g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_EOF,
624                         "End of file");
625             return FALSE;
626         } else if (errno == EINTR) {
627             g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD,
628                         "TCP read interrupt at message start");
629             return FALSE;
630         } else {
631             g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
632                         "TCP I/O error: %s", strerror(errno));
633             return FALSE;
634         }
635     }
636     goodLen = collector->coreadLen(collector,
637                                    (fbCollectorMsgVL_t *)(msgbase - 4),
638                                    *msglen, &h_len, err);
639     if (FALSE == goodLen) return FALSE;
640 
641     /* read rest of message */
642     rrem = h_len - 4;
643     while (rrem) {
644         rc = fbCollectorHandleSelect(collector);
645 
646         if (rc < 0) {
647             g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
648                         "Interrupted by pipe");
649             /* interrupted by pipe read or other error with select*/
650             return FALSE;
651           }
652         rc = read(collector->stream.fd, msgbase, rrem);
653         if (rc > 0) {
654             rrem -= rc;
655             msgbase += rc;
656         } else if (rc == 0) {
657             g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_EOF,
658                         "End of file");
659             return FALSE;
660         } else if (errno == EINTR) {
661             g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD,
662                         "TCP read interrupt in message");
663             return FALSE;
664         } else {
665             g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
666                         "TCP I/O error: %s", strerror(errno));
667             return FALSE;
668         }
669     }
670 
671     /* Post process, if needed and return message length from header. */
672     *msglen = h_len;
673     if (!collector->copostRead(collector, msgbase, msglen, err)) {
674         return FALSE;
675     }
676     return TRUE;
677 }
678 
fbCollectorSetUDPSpec(fbCollector_t * collector,fbUDPConnSpec_t * spec)679 static void fbCollectorSetUDPSpec(
680     fbCollector_t         *collector,
681     fbUDPConnSpec_t       *spec)
682 {
683     if (collector->udp_head == NULL) {
684         collector->udp_head = spec;
685         collector->udp_tail = spec;
686     } else if (collector->udp_head != spec) {
687         /* don't pick if it's new */
688         if (spec->prev || spec->next) {
689             /* connect last to next */
690             if (spec->prev) {
691                 spec->prev->next = spec->next;
692             }
693 
694             /* connect next to last last */
695             if (spec->next) {
696                 spec->next->prev = spec->prev;
697             } else {
698                 collector->udp_tail = spec->prev;
699             }
700 
701             spec->prev = NULL;
702             fbListenerSetPeerSession(collector->listener, spec->session);
703         }
704 
705         /* now set it in the front */
706         spec->next = collector->udp_head;
707         collector->udp_head->prev = spec;
708         collector->udp_head = spec;
709     }
710 }
711 
fbCollectorFreeUDPSpec(fbCollector_t * collector,fbUDPConnSpec_t * spec)712 static void fbCollectorFreeUDPSpec(
713     fbCollector_t          *collector,
714     fbUDPConnSpec_t        *spec)
715 {
716     /* let translators release state */
717     collector->cotimeOut(collector, spec->session);
718 
719     /* don't free the last session, fbufree will do that */
720     if (collector->udp_tail != collector->udp_head) {
721         fbSessionFree(spec->session);
722     }
723 
724     if (collector->udp_tail == spec) {
725         if (spec->prev) {
726             collector->udp_tail = spec->prev;
727             spec->prev->next = NULL;
728         } else {
729             collector->udp_tail = NULL;
730         }
731     }
732 
733     if (collector->multi_session) {
734         fbListenerAppFree(collector->listener, spec->ctx);
735     }
736 
737     g_slice_free(fbUDPConnSpec_t, spec);
738 }
739 
740 /**
741  * fbCollectorVerifyUDPPeer
742  *
743  *
744  *
745  */
fbCollectorVerifyUDPPeer(fbCollector_t * collector,struct sockaddr * from,socklen_t fromlen,GError ** err)746 static gboolean fbCollectorVerifyUDPPeer(
747     fbCollector_t   *collector,
748     struct sockaddr *from,
749     socklen_t       fromlen,
750     GError          **err)
751 {
752     fbUDPConnSpec_t    *udp = collector->udp_head;
753     gboolean           found = FALSE;
754 
755     /* stash the address if we've not seen it before */
756     /* compare the address if we have */
757     /* appinit should simulate no data (NLREAD) if message is from wrong peer*/
758 
759     if (collector->accept_only) {
760         if (collector->peer.so.sa_family == from->sa_family) {
761             if (from->sa_family == AF_INET) {
762                 if (memcmp(&(collector->peer.ip4.sin_addr),
763                            &(((struct sockaddr_in *)from)->sin_addr),
764                            sizeof(struct in_addr)))
765                 {
766                     g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD,
767                                 "Ignoring message from peer");
768                     return FALSE;
769                 }
770             } else if (from->sa_family == AF_INET6) {
771                 if(memcmp(&(collector->peer.ip6.sin6_addr),
772                           &(((struct sockaddr_in6 *)from)->sin6_addr),
773                           sizeof(struct in6_addr)))
774                 {
775                     g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD,
776                                 "Ignoring message from peer");
777                     return FALSE;
778                 }
779             }
780         }
781     } else {
782         memcpy(&(collector->peer.so), from,
783                (fromlen > sizeof(collector->peer)) ?
784                sizeof(collector->peer) : fromlen);
785     }
786 
787     while (udp) {
788         /* loop through and find current one */
789         if (udp->obdomain == collector->obdomain) {
790             if (!memcmp(&(udp->peer.so), from, udp->peerlen)) {
791                 /* we have a match - set session */
792                 fbCollectorSetUDPSpec(collector, udp);
793                 found = TRUE;
794                 break;
795             }
796         }
797         udp = udp->next;
798     }
799 
800 
801     if (!found) {
802         udp = g_slice_new0(fbUDPConnSpec_t);
803         memcpy(&(udp->peer.so), from, (fromlen > sizeof(udp->peer)) ?
804                sizeof(udp->peer) : fromlen);
805         udp->peerlen = (fromlen > sizeof(udp->peer)) ? sizeof(udp->peer) : fromlen;
806         udp->obdomain = collector->obdomain;
807         /* create a new session */
808         udp->session = fbListenerSetPeerSession(collector->listener, NULL);
809         fbCollectorSetUDPSpec(collector, udp);
810 
811         /* call app init for new UDP connection*/
812         if (collector->multi_session) {
813             if (!fbListenerCallAppInit(collector->listener, udp, err)) {
814                 udp->last_seen = collector->time;
815                 udp->reject = TRUE;
816                 return FALSE;
817             }
818         } else {
819             /* backwards compatibility -> need to associate the ctx with all
820                sessions */
821             udp->ctx = collector->ctx;
822         }
823     } else {
824         if (udp->reject) {
825             g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD,
826                         "Rejecting previously rejected connection");
827             return FALSE;
828         }
829     }
830 
831     collector->ctx = udp->ctx;
832     udp->last_seen = collector->time;
833 
834     while (collector->udp_tail &&
835            (difftime(collector->time, collector->udp_tail->last_seen) >
836             FB_UDP_TIMEOUT))
837     {
838         /* timeout check */
839         fbCollectorFreeUDPSpec(collector, collector->udp_tail);
840     }
841 
842 
843     return TRUE;
844 }
845 
846 /**
847  * fbCollectorReadUDP
848  *
849  *
850  *
851  */
fbCollectorReadUDP(fbCollector_t * collector,uint8_t * msgbase,size_t * msglen,GError ** err)852 static gboolean fbCollectorReadUDP(
853     fbCollector_t   *collector,
854     uint8_t         *msgbase,
855     size_t          *msglen,
856     GError          **err)
857 {
858     uint16_t        msgSize = 0;
859     ssize_t         recvlen = 0;
860     int             rc;
861     union {
862         struct sockaddr         so;
863         struct sockaddr_in      ip4;
864         struct sockaddr_in6     ip6;
865     }                           peer;
866     socklen_t                   peerlen;
867 
868     memset(&peer, 0, sizeof(peer));
869 
870     rc = fbCollectorHandleSelect(collector);
871 
872     if (rc < 0) {
873         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
874                     "Interrupted by pipe");
875         /* interrupted by pipe read or other error with select*/
876         return FALSE;
877     }
878 
879     peerlen = sizeof(peer);
880     recvlen = recvfrom(collector->stream.fd, msgbase, *msglen, 0,
881                        (struct sockaddr *)&peer, &peerlen);
882 
883 
884     if (peer.so.sa_family == AF_INET6) {
885         peer.ip6.sin6_flowinfo = 0;
886         peer.ip6.sin6_scope_id = 0;
887     }
888 
889     if (!collector->comsgHeader(collector, msgbase, recvlen, &msgSize, err)) {
890         return FALSE;
891     }
892 
893     if (msgSize > 0) {
894         *msglen = msgSize;
895         /** Fixed this to do the right thing.  We now map ip
896          * addresses/port and observation domains to sessions.  If
897          * accept-only is set on the collector, we'll only return TRUE
898          * if the ip/ports match.  We will return NL_READ if FALSE, and the
899          * app using fixbuf should ignore error codes = NL_READ.**/
900 
901         /* this will only veto if we set accept from explicitly*/
902         if (!fbCollectorVerifyUDPPeer(collector, &(peer.so), peerlen, err)) {
903             return FALSE;
904         }
905         if (!collector->copostRead(collector, msgbase, msglen, err)) {
906             return FALSE;
907         }
908         return TRUE;
909     } else if (errno == EINTR || errno == EWOULDBLOCK) {
910         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD,
911                         "UDP read interrupt or timeout");
912         return FALSE;
913     } else {
914         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
915                     "UDP I/O error: %s", strerror(errno));
916         return FALSE;
917     }
918 }
919 
920 /**
921  * fbCollectorCloseSocket
922  *
923  *
924  *
925  */
fbCollectorCloseSocket(fbCollector_t * collector)926 static void fbCollectorCloseSocket(
927     fbCollector_t   *collector)
928 {
929     if (collector->stream.fd != -1) {
930         close(collector->stream.fd);
931         /* don't set to -1 because we need it to remove listener */
932     }
933 
934     if (collector->rip != -1) {
935         close(collector->rip);
936         collector->rip = -1;
937     }
938 
939     if (collector->wip != -1) {
940         close(collector->wip);
941         collector->wip = -1;
942     }
943 
944     collector->active = FALSE;
945 }
946 
947 /**
948  * fbCollectorAllocSocket
949  *
950  *
951  *
952  */
fbCollectorAllocSocket(fbListener_t * listener,void * ctx,int fd,struct sockaddr * peer,size_t peerlen,GError ** err)953 fbCollector_t *fbCollectorAllocSocket(
954     fbListener_t    *listener,
955     void            *ctx,
956     int             fd,
957     struct sockaddr *peer,
958     size_t          peerlen,
959     GError          **err)
960 {
961     fbCollector_t  *collector   = NULL;
962     fbConnSpec_t   *spec        = fbListenerGetConnSpec(listener);
963     int             pfd[2];
964 
965     /* Create a new collector */
966     collector = g_slice_new0(fbCollector_t);
967 
968     /* Fill it in */
969     collector->listener = listener;
970     collector->ctx = ctx;
971     collector->stream.fd = fd;
972     collector->bufferedStream = FALSE;
973     collector->active = TRUE;
974     collector->copostRead = fbCollectorPostProcNull;
975     collector->coreadLen = fbCollectorDecodeMsgVL;
976     collector->comsgHeader = fbCollectorMessageHeaderNull;
977     collector->coclose = fbCollectorCloseSocket;
978     collector->cotransClose = fbCollectorCloseTranslatorNull;
979     collector->cotimeOut = fbCollectorSessionTimeoutNull;
980     collector->translationActive = FALSE;
981     collector->multi_session = FALSE;
982 
983     /* Create interrupt pipe */
984     if (pipe(pfd)) {
985         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
986                     "Unable to create pipe on collector: %s", strerror(errno));
987         g_slice_free(fbCollector_t, collector);
988         return NULL;
989     }
990     collector->rip = pfd[0];
991     collector->wip = pfd[1];
992 
993     if (peerlen) {
994         memcpy(&(collector->peer.so), peer,
995                (peerlen > sizeof(collector->peer)) ?
996                     sizeof(collector->peer) : peerlen);
997     }
998 
999     /* Select a reader function */
1000     switch(spec->transport) {
1001 #if FB_ENABLE_SCTP
1002     case FB_SCTP:
1003         collector->coread = fbCollectorReadSCTP;
1004         break;
1005 #endif
1006     case FB_TCP:
1007         collector->coread = fbCollectorReadTCP;
1008         break;
1009     case FB_UDP:
1010         collector->coread = fbCollectorReadUDP;
1011         collector->comsgHeader = fbCollectorUDPMessageHeader;
1012         break;
1013     default:
1014         g_assert_not_reached();
1015     }
1016 
1017     /* All done */
1018     return collector;
1019 }
1020 
1021 #if HAVE_OPENSSL
1022 
1023 /**
1024  * fbCollectorReadTLS
1025  *
1026  *
1027  *
1028  */
fbCollectorReadTLS(fbCollector_t * collector,uint8_t * msgbase,size_t * msglen,GError ** err)1029 static gboolean fbCollectorReadTLS(
1030     fbCollector_t   *collector,
1031     uint8_t         *msgbase,
1032     size_t          *msglen,
1033     GError          **err)
1034 {
1035     int                     rc;
1036     uint16_t                h_len, rrem;
1037     gboolean                rv;
1038 
1039     /* Read and decode version and length */
1040     g_assert(*msglen > 4);
1041     rrem = 4;
1042     while (rrem) {
1043         rc = SSL_read(collector->ssl, msgbase, rrem);
1044         if (rc > 0) {
1045             rrem -= rc;
1046             msgbase += rc;
1047         } else if (rc == 0) {
1048             /* FIXME this isn't _quite_ robust but it's good enough for now.
1049                we'll fix this when we do TLS/TCP stress testing. */
1050             g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_EOF,
1051                         "TLS connection shutdown");
1052             return FALSE;
1053         } else {
1054             g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
1055                         "TLS I/O error at message start: %s",
1056                         ERR_error_string(ERR_get_error(), NULL));
1057             while (ERR_get_error());
1058             return FALSE;
1059         }
1060     }
1061     rv = collector->coreadLen(collector,
1062                               (fbCollectorMsgVL_t *)(msgbase - 4),
1063                               *msglen, &h_len, err);
1064     if (rv == FALSE) return FALSE;
1065 
1066     /* read rest of message */
1067     rrem = h_len - 4;
1068     while (rrem) {
1069         rc = SSL_read(collector->ssl, msgbase, rrem);
1070         if (rc > 0) {
1071             rrem -= rc;
1072             msgbase += rc;
1073         } else {
1074             g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
1075                         "TLS I/O error in message: %s",
1076                         ERR_error_string(ERR_get_error(), NULL));
1077             while (ERR_get_error());
1078             return FALSE;
1079         }
1080     }
1081 
1082     /* All done. Return message length from header. */
1083     *msglen = h_len;
1084     return TRUE;
1085 }
1086 
1087 /**
1088  * fbCollectorCloseTLS
1089  *
1090  *
1091  *
1092  */
fbCollectorCloseTLS(fbCollector_t * collector)1093 static void fbCollectorCloseTLS(
1094     fbCollector_t   *collector)
1095 {
1096     SSL_shutdown(collector->ssl);
1097     SSL_free(collector->ssl);
1098     if (collector->rip != -1)
1099     {
1100         close(collector->rip);
1101         collector->rip = -1;
1102     }
1103 
1104     if (collector->wip != -1)
1105     {
1106         close(collector->wip);
1107         collector->wip = -1;
1108     }
1109 
1110     collector->active = FALSE;
1111 }
1112 
1113 /**
1114  * fbCollectorOpenTLS
1115  *
1116  *
1117  *
1118  */
fbCollectorOpenTLS(fbCollector_t * collector,GError ** err)1119 static gboolean fbCollectorOpenTLS(
1120     fbCollector_t   *collector,
1121     GError          **err)
1122 {
1123     fbConnSpec_t    *spec = fbListenerGetConnSpec(collector->listener);
1124     BIO             *conn;
1125     gboolean        ok = TRUE;
1126 
1127     /* Initialize SSL context if necessary */
1128     if (!spec->vssl_ctx) {
1129         if (!fbConnSpecInitTLS(spec, TRUE, err)) {
1130             return FALSE;
1131         }
1132     }
1133 
1134     /* wrap a stream BIO around the opened socket */
1135     if (!(conn = BIO_new_socket(collector->stream.fd, 1))) {
1136         ok = FALSE;
1137         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1138                     "couldn't wrap socket for TLS: %s",
1139                     ERR_error_string(ERR_get_error(), NULL));
1140         while (ERR_get_error());
1141         goto end;
1142     }
1143 
1144     /* create SSL socket */
1145     if (!(collector->ssl = SSL_new((SSL_CTX *)spec->vssl_ctx))) {
1146         ok = FALSE;
1147         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1148                     "couldnt create TLS socket: %s",
1149                     ERR_error_string(ERR_get_error(), NULL));
1150         while (ERR_get_error());
1151         goto end;
1152     }
1153 
1154     /* accept SSL connection */
1155     SSL_set_accept_state(collector->ssl);
1156     SSL_set_bio(collector->ssl, conn, conn);
1157     SSL_set_mode(collector->ssl, SSL_MODE_AUTO_RETRY);
1158     if (SSL_accept(collector->ssl) <= 0) {
1159         ok = FALSE;
1160         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1161                     "couldn't accept on connected TLS socket: %s",
1162                     ERR_error_string(ERR_get_error(), NULL));
1163         while (ERR_get_error());
1164         goto end;
1165     }
1166 
1167     /* FIXME do post-connection verification */
1168 
1169 end:
1170     if (!ok) {
1171         collector->active = FALSE;
1172         if (collector->ssl) {
1173             SSL_free(collector->ssl);
1174             collector->ssl = NULL;
1175         } else if (conn) {
1176             BIO_vfree(conn);
1177         }
1178     }
1179     return ok;
1180 }
1181 
1182 #if HAVE_OPENSSL_DTLS
1183 
1184 /**
1185  * fbCollectorOpenDTLS
1186  *
1187  *
1188  *
1189  */
fbCollectorOpenDTLS(fbCollector_t * collector,GError ** err)1190 static gboolean fbCollectorOpenDTLS(
1191     fbCollector_t   *collector,
1192     GError          **err)
1193 {
1194     fbConnSpec_t    *spec = fbListenerGetConnSpec(collector->listener);
1195     BIO             *conn;
1196     gboolean        ok = TRUE;
1197 
1198     /* Initialize SSL context if necessary */
1199     if (!spec->vssl_ctx) {
1200         if (!fbConnSpecInitTLS(spec, TRUE, err)) {
1201             return FALSE;
1202         }
1203     }
1204 
1205     /* wrap a stream BIO around the opened socket */
1206     if (!(conn = BIO_new_dgram(collector->stream.fd, 1))) {
1207         ok = FALSE;
1208         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1209                     "couldn't wrap socket for TLS: %s",
1210                     ERR_error_string(ERR_get_error(), NULL));
1211         while (ERR_get_error());
1212         goto end;
1213     }
1214 
1215     /* create SSL socket */
1216     if (!(collector->ssl = SSL_new((SSL_CTX *)spec->vssl_ctx))) {
1217         ok = FALSE;
1218         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1219                     "couldnt create TLS socket: %s",
1220                     ERR_error_string(ERR_get_error(), NULL));
1221         while (ERR_get_error());
1222         goto end;
1223     }
1224 
1225     /* Enable cookie exchange */
1226     SSL_set_options(collector->ssl, SSL_OP_COOKIE_EXCHANGE);
1227 
1228     /* accept SSL connection */
1229     SSL_set_bio(collector->ssl, conn, conn);
1230     SSL_set_accept_state(collector->ssl);
1231     SSL_set_mode(collector->ssl, SSL_MODE_AUTO_RETRY);
1232     if (SSL_accept(collector->ssl) <= 0) {
1233         ok = FALSE;
1234         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1235                     "couldn't accept on connected TLS socket: %s",
1236                     ERR_error_string(ERR_get_error(), NULL));
1237         while (ERR_get_error());
1238         goto end;
1239     }
1240 
1241     /* FIXME do post-connection verification */
1242 
1243 end:
1244     if (!ok) {
1245         collector->active = FALSE;
1246         if (collector->ssl) {
1247             SSL_free(collector->ssl);
1248             collector->ssl = NULL;
1249         } else if (conn) {
1250             BIO_vfree(conn);
1251         }
1252     }
1253     return ok;
1254 }
1255 
1256 #endif /* HAVE_OPENSSL_DTLS */
1257 
1258 /**
1259  * fbCollectorAllocTLS
1260  *
1261  *
1262  *
1263  */
fbCollectorAllocTLS(fbListener_t * listener,void * ctx,int fd,struct sockaddr * peer,size_t peerlen,GError ** err)1264 fbCollector_t *fbCollectorAllocTLS(
1265     fbListener_t    *listener,
1266     void            *ctx,
1267     int             fd,
1268     struct sockaddr *peer,
1269     size_t          peerlen,
1270     GError          **err)
1271 {
1272     gboolean        ok = TRUE;
1273     fbCollector_t   *collector = NULL;
1274     fbConnSpec_t    *spec = fbListenerGetConnSpec(listener);
1275 
1276     /* Create a new collector */
1277     collector = g_slice_new0(fbCollector_t);
1278 
1279     /* Fill it in */
1280     collector->listener = listener;
1281     collector->ctx = ctx;
1282     collector->stream.fd = fd;
1283     collector->bufferedStream = FALSE;
1284     collector->active = TRUE;
1285     collector->copostRead = fbCollectorPostProcNull;
1286     collector->coreadLen = fbCollectorDecodeMsgVL;
1287     collector->comsgHeader = fbCollectorMessageHeaderNull;
1288     collector->coread = fbCollectorReadTLS;
1289     collector->coclose = fbCollectorCloseTLS;
1290     collector->cotransClose = fbCollectorCloseTranslatorNull;
1291     collector->cotimeOut = fbCollectorSessionTimeoutNull;
1292     collector->translationActive = FALSE;
1293     if (peerlen) {
1294         memcpy(&(collector->peer.so), peer,
1295                (peerlen > sizeof(collector->peer)) ?
1296                     sizeof(collector->peer) : peerlen);
1297     }
1298 
1299 
1300     /* Do TLS accept atop opened socket */
1301     switch (spec->transport) {
1302     case FB_TLS_TCP:
1303         ok = fbCollectorOpenTLS(collector, err);
1304         break;
1305 #if HAVE_OPENSSL_DTLS
1306     case FB_DTLS_UDP:
1307 #if HAVE_OPENSSL_DTLS_SCTP
1308     case FB_DTLS_SCTP:
1309 #endif
1310         ok = fbCollectorOpenDTLS(collector, err);
1311     break;
1312 #endif
1313     default:
1314         g_assert_not_reached();
1315     }
1316 
1317     /* Nuke collector on TLS setup error */
1318     if (!ok) {
1319         g_slice_free(fbCollector_t, collector);
1320         return NULL;
1321     }
1322 
1323     /* All done */
1324     return collector;
1325 }
1326 
1327 #endif /* HAVE_OPENSSL */
1328 
1329 #if HAVE_SPREAD
1330 
fbCollectorSpreadOpen(fbCollector_t * collector,GError ** err)1331 static gboolean fbCollectorSpreadOpen(
1332     fbCollector_t  *collector,
1333     GError        **err)
1334 {
1335     int ret;
1336     int i = 0;
1337     char grp[MAX_GROUP_NAME];
1338     fbSpreadSpec_t *spread = collector->stream.spread;
1339 
1340     if (!spread->daemon)
1341     {
1342         g_set_error( err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1343                      "Spread daemon name cannot be null" );
1344         return FALSE;
1345     }
1346     if (!spread->daemon[0])
1347     {
1348         g_set_error( err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1349                      "Spread daemon name cannot be empty" );
1350         return FALSE;
1351     }
1352     /*if (strnlen( spread->daemon, 262 ) > 261)*/
1353     if ( !(memchr( spread->daemon, 0, 261)) )
1354     {
1355         g_set_error( err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1356                      "Spread daemon name too long" );
1357         return FALSE;
1358     }
1359     if (!spread->groups)
1360     {
1361         g_set_error( err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1362                      "Spread groups cannot be null" );
1363         return FALSE;
1364     }
1365     if (!spread->groups[0].name[0])
1366     {
1367         g_set_error( err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1368                      "Spread groups cannot be empty" );
1369         return FALSE;
1370     }
1371     if (!spread->session)
1372     {
1373         g_set_error( err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1374                      "Spread session cannot be null" );
1375         return FALSE;
1376     }
1377 
1378     ret = SP_connect( spread->daemon, 0, 0, 0, &(spread->recv_mbox),
1379                       spread->privgroup );
1380 
1381     if (ret != ACCEPT_SESSION)
1382     {
1383         g_set_error( err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1384                      "error connecting to Spread daemon %s: %s",
1385                      spread->daemon, fbConnSpreadError( ret ) );
1386         return FALSE;
1387     }
1388 
1389     /* mark it active here, fbCollectorFree() will need to disconnect */
1390     collector->active = TRUE;
1391 
1392     for (i = 0; i < spread->num_groups; ++i)
1393     {
1394         ret = SP_join( spread->recv_mbox, spread->groups[i].name);
1395         if (ret)
1396         {
1397             g_set_error( err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1398                          "error joining to Spread data group %s: %s",
1399                          spread->groups[i].name, fbConnSpreadError( ret ) );
1400             return FALSE;
1401         }
1402     }
1403 
1404     /* now that we have joined the data plane group, join the
1405      * control/template plane group to signal exporters that
1406      * we need the templates for this group. */
1407 
1408     for (i = 0; i < spread->num_groups; ++i)
1409     {
1410         memset( grp, 0, sizeof( grp ) );
1411         strncpy( grp, spread->groups[i].name, sizeof( grp) - 2 );
1412         strcat( grp, "T" );
1413         ret = SP_join( spread->recv_mbox, grp );
1414 
1415         if (ret) {
1416             g_set_error( err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1417                        "error joining to Spread control/template group %s: %s",
1418                          spread->groups[i].name, fbConnSpreadError( ret ) );
1419             return FALSE;
1420         }
1421     }
1422 
1423     return TRUE;
1424 }
1425 
1426 /* There is no good way to deal with sequence numbers in Spread.
1427  * This was added to return False if the collector receives
1428  * a message where it is not the first group listed in msg.
1429  * This is because the exporter only looks at the first group
1430  * when deciding what sequence number to export */
1431 
fbCollectorSpreadPostProc(fbCollector_t * collector,uint8_t * buffer,size_t * b_len,GError ** err)1432 static gboolean fbCollectorSpreadPostProc(
1433     fbCollector_t *collector,
1434     uint8_t       *buffer,
1435     size_t        *b_len,
1436     GError        **err)
1437 {
1438     if (fbCollectorTestGroupMembership(collector, 0)) {
1439         return TRUE;
1440     }
1441     (void) collector;
1442     (void) buffer;
1443     (void) b_len;
1444     (void) err;
1445 
1446     return FALSE;
1447 }
1448 
fbCollectorGetSpreadReturnGroups(fbCollector_t * collector,char * groups[])1449 int fbCollectorGetSpreadReturnGroups(
1450     fbCollector_t *collector,
1451     char *groups[])
1452 {
1453     int loop = 0;
1454     fbSpreadSpec_t *spread = collector->stream.spread;
1455 
1456     for ( loop = 0; loop < spread->recv_num_groups; loop++){
1457         groups[loop] = spread->recv_groups[loop].name;
1458     }
1459 
1460     return spread->recv_num_groups;
1461 }
1462 
fbCollectorSpreadRead(fbCollector_t * collector,uint8_t * msgbase,size_t * msglen,GError ** err)1463 static gboolean fbCollectorSpreadRead(
1464     fbCollector_t  *collector,
1465     uint8_t        *msgbase,
1466     size_t         *msglen,
1467     GError        **err)
1468 {
1469     fbSpreadSpec_t *spread = collector->stream.spread;
1470 
1471     service         service_type = 0;
1472     char            sender[MAX_GROUP_NAME];
1473     int16           mess_type = 0;
1474     int             endian_mismatch;
1475     int             no_mess = 1;
1476     int             ret;
1477 
1478     do {
1479 
1480         ret = SP_receive( spread->recv_mbox, &service_type, sender,
1481                           spread->recv_max_groups,
1482                           &(spread->recv_num_groups),
1483                           (char (*)[])spread->recv_groups,
1484                           &mess_type, &endian_mismatch, *msglen,
1485                           (char *)msgbase );
1486 
1487         if (spread->recv_exit) {
1488             g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_EOF,
1489                         "End of file: spread shut down or was not connected");
1490             return FALSE;
1491         }
1492 
1493         if (ret < 0) {
1494             if (ret == GROUPS_TOO_SHORT) {
1495                 g_free(spread->recv_groups);
1496                 spread->recv_max_groups = -spread->recv_num_groups;
1497                 spread->recv_groups = g_new0( sp_groupname_t,
1498                                               spread->recv_max_groups );
1499             } else if (ret == BUFFER_TOO_SHORT) {
1500                 *msglen = -endian_mismatch;
1501                 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_EOM,
1502                             "msglen too small (%zd required)", *msglen);
1503                 return FALSE;
1504             } else if ((ret == CONNECTION_CLOSED) || (ret == ILLEGAL_SESSION))
1505             {
1506                 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1507                             "End of file: %s", fbConnSpreadError(ret));
1508                 return FALSE;
1509             } else {
1510                 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
1511                             "error(%d) receiving Spread message: %s", ret,
1512                             fbConnSpreadError(ret));
1513 
1514                 *msglen = 0;
1515                 return FALSE;
1516             }
1517         } else {
1518             *msglen = ret;
1519             no_mess = 0;
1520         }
1521 
1522     } while (no_mess);
1523 
1524     return TRUE;
1525 }
1526 
1527 
1528 
fbCollectorSpreadClose(fbCollector_t * collector)1529 static void fbCollectorSpreadClose(
1530     fbCollector_t *collector)
1531 {
1532     if (collector->active) {
1533         SP_disconnect( collector->stream.spread->recv_mbox );
1534     }
1535     collector->active = FALSE;
1536 }
1537 
fbCollectorAllocSpread(void * ctx,fbSpreadParams_t * params,GError ** err)1538 fbCollector_t *fbCollectorAllocSpread(
1539     void              *ctx,
1540     fbSpreadParams_t  *params,
1541     GError           **err)
1542 {
1543     fbCollector_t *collector = g_slice_new0( fbCollector_t );
1544 
1545     collector->ctx = ctx;
1546     collector->stream.spread = fbConnSpreadCopy( params );
1547     collector->bufferedStream = FALSE;
1548     collector->active = FALSE;
1549     collector->spread_active = 1;
1550 
1551     collector->coread = fbCollectorSpreadRead;
1552     collector->coreadLen = fbCollectorDecodeMsgVL;  /* after SCTP */
1553     collector->copostRead = fbCollectorSpreadPostProc; /* after SCTP */
1554     collector->comsgHeader = fbCollectorMessageHeaderNull; /* after SCTP */
1555     collector->coclose = fbCollectorSpreadClose;
1556     collector->cotransClose = fbCollectorCloseTranslatorNull; /* after SCTP */
1557     collector->cotimeOut = fbCollectorSessionTimeoutNull;
1558 
1559     collector->translationActive = FALSE;
1560 
1561     if (!fbCollectorSpreadOpen( collector, err )) {
1562         fbCollectorFree( collector );
1563         return 0;
1564     }
1565 
1566     return collector;
1567 }
1568 
fbCollectorTestGroupMembership(fbCollector_t * collector,int group_offset)1569 gboolean fbCollectorTestGroupMembership(
1570     fbCollector_t *collector,
1571     int            group_offset)
1572 {
1573     int loop;
1574     fbSpreadSpec_t *spread = NULL;
1575 
1576     if (!collector) {
1577         return TRUE;
1578     }
1579 
1580     if (!collector->spread_active) {
1581         return TRUE;
1582     }
1583 
1584     spread = collector->stream.spread;
1585     for (loop = 0; loop < spread->num_groups; loop++) {
1586         if (strcmp(spread->recv_groups[group_offset].name,
1587                    spread->groups[loop].name) == 0)
1588         {
1589             fbSessionSetGroup(spread->session,
1590                               (char *)spread->recv_groups[group_offset].name);
1591             return TRUE;
1592         }
1593     }
1594 
1595     return FALSE;
1596 }
1597 
1598 
1599 #endif /* HAVE_SPREAD */
1600 
1601 /**
1602  * fbCollectMessage
1603  *
1604  *
1605  *
1606  */
fbCollectMessage(fbCollector_t * collector,uint8_t * msgbase,size_t * msglen,GError ** err)1607 gboolean        fbCollectMessage(
1608     fbCollector_t   *collector,
1609     uint8_t         *msgbase,
1610     size_t          *msglen,
1611     GError          **err)
1612 {
1613     /* Ensure stream is open */
1614     if (!collector->active) {
1615         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1616                     "Collector not active");
1617         return FALSE;
1618     }
1619 
1620     /* Attempt to read message */
1621     if (collector->coread(collector, msgbase, msglen, err)) return TRUE;
1622 
1623     /* Read failure; signal error */
1624     return FALSE;
1625 }
1626 
1627 /**
1628  * fbCollectorGetContext
1629  *
1630  *
1631  *
1632  */
fbCollectorGetContext(fbCollector_t * collector)1633 void          *fbCollectorGetContext(
1634     fbCollector_t   *collector)
1635 {
1636     return collector->ctx;
1637 }
1638 
1639 
1640 /**
1641  * fbCollectorHasTranslator
1642  *
1643  * use this is check to see if a protocol translation
1644  * is in use for this collector.  Needed by the transcode
1645  * and IPFIX machinery to get rid of some error checks
1646  * which no longer apply.
1647  *
1648  * @param collector pointer to the collector state struct
1649  *
1650  * @return TRUE if a translator is in use, FALSE otherwise
1651  */
fbCollectorHasTranslator(fbCollector_t * collector)1652 gboolean        fbCollectorHasTranslator(
1653     fbCollector_t   *collector)
1654 {
1655     return collector->translationActive;
1656 }
1657 
1658 /**
1659  * fbCollectorGetFD
1660  *
1661  *
1662  *
1663  */
fbCollectorGetFD(fbCollector_t * collector)1664 int             fbCollectorGetFD(
1665     fbCollector_t   *collector)
1666 {
1667     return collector->stream.fd;
1668 }
1669 
1670 /**
1671  * fbCollectorSetFD
1672  *
1673  *
1674  *
1675  */
fbCollectorSetFD(fbCollector_t * collector,int fd)1676 void             fbCollectorSetFD(
1677     fbCollector_t   *collector,
1678     int              fd)
1679 {
1680     if (collector) {
1681         collector->stream.fd = fd;
1682     }
1683 }
1684 
1685 
1686 /**
1687  * fbCollectorClose
1688  *
1689  *
1690  *
1691  */
fbCollectorClose(fbCollector_t * collector)1692 void            fbCollectorClose(
1693     fbCollector_t   *collector)
1694 {
1695     if (collector->active && collector->coclose) collector->coclose(collector);
1696 
1697     if (collector->listener) {
1698         fbListenerRemove(collector->listener, collector->stream.fd);
1699     }
1700 
1701 }
1702 
1703 /**
1704  * fbCollectorFree
1705  *
1706  *
1707  *
1708  */
fbCollectorFree(fbCollector_t * collector)1709 void            fbCollectorFree(
1710     fbCollector_t   *collector)
1711 {
1712     if (!collector->multi_session) {
1713         fbListenerAppFree(collector->listener, collector->ctx);
1714     }
1715     collector->cotransClose(collector);
1716     fbCollectorClose(collector);
1717 #if HAVE_SPREAD
1718     if (collector->coclose == fbCollectorSpreadClose) {
1719         fbConnSpreadFree( collector->stream.spread );
1720     }
1721 #endif
1722     while (collector->udp_tail) {
1723         fbCollectorFreeUDPSpec(collector, collector->udp_tail);
1724     }
1725 
1726     g_slice_free(fbCollector_t, collector);
1727 }
1728 
1729 
1730 /**
1731  * fbCollectorClearTranslator
1732  *
1733  * @param collector the collector on which to remove
1734  *        the translator
1735  *
1736  * @return TRUE on success, FALSE on failure
1737  */
fbCollectorClearTranslator(fbCollector_t * collector,GError ** err)1738 gboolean    fbCollectorClearTranslator(
1739     fbCollector_t   *collector,
1740     GError          **err __attribute__((unused)) )
1741 {
1742     collector->cotransClose(collector);
1743 
1744     return TRUE;
1745 }
1746 
1747 
1748 /**
1749  * fbCollectorSetTranslator
1750  *
1751  * this sets the collector input to any
1752  * given translator
1753  *
1754  * @param collector the collector to apply the protocol
1755  *        convertor to
1756  * @param postProcFunc a function called after the read
1757  *        to do any post processing/conversion to turn
1758  *        the buffer into an IPFIX buffer
1759  * @param vlMessageFunc function to determine the
1760  *        amount needed to complete the next read
1761  * @param headerFunc function to transform the header after
1762  *        a block read before it is sent to the
1763  *        postProcFunc (called when vlMessageFunc isn't)
1764  * @param trCloseFunc if anything is needed to be cleaned
1765  *        up in the translator when a collector is closed
1766  *        this function will be called before the collector
1767  *        is closed
1768  * @param timeOutFunc when UDP sessions timeout, this function will
1769  *        clear any state associated with the session.
1770  * @param opaque a void pointer to hold a translator
1771  *        specific state structure
1772  * @param err holds the glib based error message on
1773  *        error
1774  *
1775  * @return TRUE on success, FALSE on error
1776  */
fbCollectorSetTranslator(fbCollector_t * collector,fbCollectorPostProc_fn postProcFunc,fbCollectorVLMessageSize_fn vlMessageFunc,fbCollectorMessageHeader_fn headerFunc,fbCollectorTransClose_fn trCloseFunc,fbCollectorSessionTimeout_fn timeOutFunc,void * opaque,GError ** err)1777 gboolean    fbCollectorSetTranslator(
1778     fbCollector_t                *collector,
1779     fbCollectorPostProc_fn       postProcFunc,
1780     fbCollectorVLMessageSize_fn  vlMessageFunc,
1781     fbCollectorMessageHeader_fn  headerFunc,
1782     fbCollectorTransClose_fn     trCloseFunc,
1783     fbCollectorSessionTimeout_fn timeOutFunc,
1784     void                         *opaque,
1785     GError                       **err)
1786 {
1787     if (NULL != collector->translatorState)
1788     {
1789         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_TRANSMISC,
1790             "Translator is already set on this collector, "
1791             "must be cleared first");
1792         return FALSE;
1793     }
1794 
1795     collector->copostRead = postProcFunc;
1796     collector->coreadLen = vlMessageFunc;
1797     collector->comsgHeader = headerFunc;
1798     collector->cotransClose = trCloseFunc;
1799     collector->cotimeOut = timeOutFunc;
1800     collector->translatorState = opaque;
1801     collector->translationActive = TRUE;
1802 
1803     return TRUE;
1804 }
1805 
fbCollectorGetPeer(fbCollector_t * collector)1806 struct sockaddr* fbCollectorGetPeer(
1807     fbCollector_t   *collector)
1808 {
1809     return (&collector->peer.so);
1810 }
1811 
fbCollectorInterruptSocket(fbCollector_t * collector)1812 void fbCollectorInterruptSocket(
1813     fbCollector_t   *collector)
1814 {
1815     uint8_t     byte = 0xe7;
1816 
1817 #if HAVE_SPREAD
1818     if (collector->spread_active) {
1819         fbCollectorSpreadClose(collector);
1820         return;
1821     }
1822 #endif
1823 
1824     write(collector->wip, &byte, sizeof(byte));
1825     write(collector->rip, &byte, sizeof(byte));
1826 }
1827 
fbCollectorRemoveListenerLastBuf(fBuf_t * fbuf,fbCollector_t * collector)1828 void fbCollectorRemoveListenerLastBuf(
1829     fBuf_t         *fbuf,
1830     fbCollector_t  *collector)
1831 {
1832     /* may not have a listener - esp for spread */
1833     if (collector->listener) {
1834         fbListenerRemoveLastBuf(fbuf, collector->listener);
1835     }
1836 }
1837 
fbCollectorGetObservationDomain(fbCollector_t * collector)1838 uint32_t fbCollectorGetObservationDomain(
1839     fbCollector_t   *collector)
1840 {
1841     if (!collector) {
1842         return 0;
1843     }
1844 
1845     return collector->obdomain;
1846 }
1847 
fbCollectorSetAcceptOnly(fbCollector_t * collector,struct sockaddr * address,size_t address_length)1848 void fbCollectorSetAcceptOnly(
1849     fbCollector_t   *collector,
1850     struct sockaddr *address,
1851     size_t           address_length)
1852 {
1853     g_assert(address);
1854     collector->accept_only = TRUE;
1855 
1856     memcpy(&(collector->peer.so), address,
1857            (address_length > sizeof(collector->peer)) ?
1858            sizeof(collector->peer) : address_length);
1859 }
1860 
fbCollectorSetUDPMultiSession(fbCollector_t * collector,gboolean multi_session)1861 void fbCollectorSetUDPMultiSession(
1862     fbCollector_t *collector,
1863     gboolean       multi_session)
1864 {
1865     collector->multi_session = multi_session;
1866 }
1867