1 /**
2  * @internal
3  *
4  * @file fbexporter.c
5  *
6  * IPFIX Exporting Process single transport session implementation
7  *
8  ** ------------------------------------------------------------------------
9  ** Copyright (C) 2006-2019 Carnegie Mellon University. All Rights Reserved.
10  ** ------------------------------------------------------------------------
11  ** Authors: Brian Trammell
12  ** ------------------------------------------------------------------------
13  ** @OPENSOURCE_LICENSE_START@
14  ** libfixbuf 2.0
15  **
16  ** Copyright 2018-2019 Carnegie Mellon University. All Rights Reserved.
17  **
18  ** NO WARRANTY. THIS CARNEGIE MELLON UNIVERSITY AND SOFTWARE
19  ** ENGINEERING INSTITUTE MATERIAL IS FURNISHED ON AN "AS-IS"
20  ** BASIS. CARNEGIE MELLON UNIVERSITY MAKES NO WARRANTIES OF ANY KIND,
21  ** EITHER EXPRESSED OR IMPLIED, AS TO ANY MATTER INCLUDING, BUT NOT
22  ** LIMITED TO, WARRANTY OF FITNESS FOR PURPOSE OR MERCHANTABILITY,
23  ** EXCLUSIVITY, OR RESULTS OBTAINED FROM USE OF THE
24  ** MATERIAL. CARNEGIE MELLON UNIVERSITY DOES NOT MAKE ANY WARRANTY OF
25  ** ANY KIND WITH RESPECT TO FREEDOM FROM PATENT, TRADEMARK, OR
26  ** COPYRIGHT INFRINGEMENT.
27  **
28  ** Released under a GNU-Lesser GPL 3.0-style license, please see
29  ** LICENSE.txt or contact permission@sei.cmu.edu for full terms.
30  **
31  ** [DISTRIBUTION STATEMENT A] This material has been approved for
32  ** public release and unlimited distribution.  Please see Copyright
33  ** notice for non-US Government use and distribution.
34  **
35  ** Carnegie Mellon(R) and CERT(R) are registered in the U.S. Patent
36  ** and Trademark Office by Carnegie Mellon University.
37  **
38  ** DM18-0325
39  ** @OPENSOURCE_LICENSE_END@
40  ** ------------------------------------------------------------------------
41  *
42  */
43 
44 #define _FIXBUF_SOURCE_
45 /*#define _GNU_SOURCE*/
46 #include <fixbuf/private.h>
47 
48 
49 /**
50  * If set in exporter SCTP mode, use simple automatic stream selection as
51  * specified in the IPFIX protocol without flexible stream selection: send
52  * templates on stream 0, and data on stream 1.
53  */
54 #define FB_F_SCTP_AUTOSTREAM        0x80000000
55 
56 /**
57  * If set in exporter SCTP mode, use TTL-based partial reliability for
58  * non-template messages.
59  */
60 #define FB_F_SCTP_PR_TTL            0x40000000
61 
62 typedef gboolean    (*fbExporterOpen_fn)(
63     fbExporter_t                *exporter,
64     GError                      **err);
65 
66 typedef gboolean    (*fbExporterWrite_fn)(
67     fbExporter_t                *exporter,
68     uint8_t                     *msgbase,
69     size_t                      msglen,
70     GError                      **err);
71 
72 typedef void        (*fbExporterClose_fn)(
73     fbExporter_t                *exporter);
74 
75 struct fbExporter_st {
76     /** Specifier used for stream open */
77     union {
78         fbConnSpec_t            *conn;
79 #if HAVE_SPREAD
80         fbSpreadSpec_t          *spread;
81 #endif
82         char                    *path;
83     }                           spec;
84     /** Current export stream */
85     union {
86         /** Buffered file pointer, for file transport */
87         FILE                    *fp;
88         /** Buffer for data if providing own transport */
89         uint8_t                 *buffer;
90         /**
91          * Unbuffered socket, for SCTP, TCP, or UDP transport.
92          * Also used as base socket for TLS and DTLS support.
93          */
94         int                     fd;
95     }                           stream;
96     /** SCTP mode. Union of FB_SCTP_F_* flags. */
97     uint32_t                    sctp_mode;
98     /** Next SCTP stream */
99     int                         sctp_stream;
100     /** Partial reliability parameter (see mode) */
101     int                         sctp_pr_param;
102 #if HAVE_OPENSSL
103     /** OpenSSL socket, for TLS or DTLS over the socket in fd. */
104     SSL                         *ssl;
105 #endif
106     gboolean                    active;
107     size_t                      msg_len;
108     fbExporterOpen_fn           exopen;
109     fbExporterWrite_fn          exwrite;
110     fbExporterClose_fn          exclose;
111     uint16_t                    mtu;
112 };
113 
114 /**
115  *fbExporterOpenFile
116  *
117  *
118  * @param exporter
119  * @param err
120  *
121  * @return
122  */
fbExporterOpenFile(fbExporter_t * exporter,GError ** err)123 static gboolean fbExporterOpenFile(
124     fbExporter_t                *exporter,
125     GError                      **err)
126 {
127     /* check to see if we're opening stdout */
128     if ((strlen(exporter->spec.path) == 1) &&
129         (exporter->spec.path[0] == '-'))
130     {
131         /* don't open a terminal */
132         if (isatty(fileno(stdout))) {
133             g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
134                         "Refusing to open stdout terminal for export");
135             return FALSE;
136         }
137 
138         /* yep, stdout */
139         exporter->stream.fp = stdout;
140     } else {
141         /* nope, just a regular file */
142         exporter->stream.fp = fopen(exporter->spec.path, "w");
143     }
144 
145     /* check for error */
146     if (!exporter->stream.fp) {
147         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
148                     "Couldn't open %s for export: %s",
149                     exporter->spec.path, strerror(errno));
150         return FALSE;
151     }
152 
153     /* set active flag */
154     exporter->active = TRUE;
155 
156     return TRUE;
157 }
158 
159 /**
160  *fbExporterWriteFile
161  *
162  *
163  * @param exporter
164  * @param msgbase
165  * @param msglen
166  * @param err
167  *
168  * @return
169  */
fbExporterWriteFile(fbExporter_t * exporter,uint8_t * msgbase,size_t msglen,GError ** err)170 static gboolean fbExporterWriteFile(
171     fbExporter_t                *exporter,
172     uint8_t                     *msgbase,
173     size_t                      msglen,
174     GError                      **err)
175 {
176     size_t                      rc;
177 
178     rc = fwrite(msgbase, 1, msglen, exporter->stream.fp);
179 
180     if (rc == msglen) {
181         return TRUE;
182     } else {
183         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
184                     "Couldn't write %u bytes to %s: %s",
185                     (uint32_t)msglen, exporter->spec.path, strerror(errno));
186         return FALSE;
187     }
188 }
189 
190 /**
191  *fbExporterCloseFile
192  *
193  *
194  * @param exporter
195  *
196  */
fbExporterCloseFile(fbExporter_t * exporter)197 static void fbExporterCloseFile(
198     fbExporter_t                *exporter)
199 {
200     if (exporter->stream.fp == stdout) {
201         fflush(exporter->stream.fp);
202     } else {
203         fclose(exporter->stream.fp);
204     }
205     exporter->stream.fp = NULL;
206     exporter->active = FALSE;
207 }
208 
209 /**
210  *fbExporterAllocFile
211  *
212  *
213  * @param path
214  * @param exporter
215  *
216  * @return
217  */
fbExporterAllocFile(const char * path)218 fbExporter_t    *fbExporterAllocFile(
219     const char      *path)
220 {
221     fbExporter_t    *exporter;
222 
223     /* Create a new exporter */
224     exporter = g_slice_new0(fbExporter_t);
225 
226     /* Copy the path */
227     exporter->spec.path = g_strdup(path);
228 
229     /* Set up stream management functions */
230     exporter->exopen = fbExporterOpenFile;
231     exporter->exwrite = fbExporterWriteFile;
232     exporter->exclose = fbExporterCloseFile;
233     exporter->mtu = 65496;
234 
235     return exporter;
236 }
237 
238 /**
239  * fbExporterOpenBuffer
240  *
241  *
242  */
fbExporterOpenBuffer(fbExporter_t * exporter,GError ** err)243 static gboolean fbExporterOpenBuffer(
244     fbExporter_t                *exporter,
245     GError                      **err)
246 {
247     (void)err;
248     /* set active flag */
249     exporter->active = TRUE;
250 
251     return TRUE;
252 }
253 
254 /**
255  *fbExporterCloseBuffer
256  *
257  *
258  * @param exporter
259  *
260  */
fbExporterCloseBuffer(fbExporter_t * exporter)261 static void fbExporterCloseBuffer(
262     fbExporter_t              *exporter)
263 {
264     exporter->active = FALSE;
265 }
266 
fbExporterWriteBuffer(fbExporter_t * exporter,uint8_t * msgbase,size_t msglen,GError ** err)267 static gboolean fbExporterWriteBuffer(
268     fbExporter_t             *exporter,
269     uint8_t                  *msgbase,
270     size_t                   msglen,
271     GError                   **err)
272 {
273     (void)err;
274     memcpy(exporter->stream.buffer, msgbase, msglen);
275     exporter->msg_len = msglen;
276 
277     return TRUE;
278 
279 }
280 
281 /**
282  * fbExporterAllocBuffer
283  *
284  *
285  *
286  */
fbExporterAllocBuffer(uint8_t * buf,uint16_t bufsize)287 fbExporter_t *fbExporterAllocBuffer(
288     uint8_t                   *buf,
289     uint16_t                   bufsize)
290 {
291     fbExporter_t *exporter = NULL;
292 
293     exporter = g_slice_new0(fbExporter_t);
294     exporter->exwrite = fbExporterWriteBuffer;
295     exporter->exopen = fbExporterOpenBuffer;
296     exporter->exclose = fbExporterCloseBuffer;
297     exporter->mtu = bufsize;
298     exporter->stream.buffer = buf;
299 
300     return exporter;
301 }
302 
303 /**
304  *fbExporterAllocFP
305  *
306  * @param fp
307  *
308  * @return
309  */
fbExporterAllocFP(FILE * fp)310 fbExporter_t    *fbExporterAllocFP(
311     FILE            *fp)
312 {
313     fbExporter_t    *exporter;
314 
315     /* Create a new exporter */
316     exporter = g_slice_new0(fbExporter_t);
317 
318     /* Reference the path */
319     exporter->spec.path = g_strdup("FP");
320 
321     /* Set up stream management functions */
322     exporter->exwrite = fbExporterWriteFile;
323     exporter->mtu = 65496;
324 
325     /* set active flag */
326     exporter->active = TRUE;
327 
328     /* set file pointer */
329     exporter->stream.fp = fp;
330 
331     return exporter;
332 }
333 
334 /**
335  *fbExporterIgnoreSigpipe
336  *
337  *
338  */
fbExporterIgnoreSigpipe(void)339 static void fbExporterIgnoreSigpipe(
340     void)
341 {
342     static gboolean ignored = FALSE;
343     struct sigaction sa, osa;
344 
345     if (ignored) return;
346 
347     sa.sa_handler = SIG_IGN;
348     sigemptyset(&sa.sa_mask);
349     sa.sa_flags = SA_RESTART;
350     if (sigaction(SIGPIPE,&sa,&osa)) {
351         g_error("sigaction(SIGPIPE) failed: %s", strerror(errno));
352     }
353 
354     ignored = TRUE;
355 }
356 
357 /**
358  *fbExporterMaxSendbuf
359  *
360  *
361  * @param sock
362  * @param size
363  *
364  * @return
365  */
fbExporterMaxSendbuf(int sock,int * size)366 static gboolean fbExporterMaxSendbuf(
367     int         sock,
368     int         *size)
369 {
370     while (*size > 4096) {
371         if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, size,
372                        sizeof(*size)) == 0)
373         {
374             return TRUE;
375         }
376         if (errno != ENOBUFS) {
377             return FALSE;
378         }
379         *size -= (*size > 1024 * 1024)
380                         ? 1024 * 1024
381                         : 2048;
382     }
383     return FALSE;
384 }
385 
386 #define FB_SOCKBUF_DEFAULT (4 * 1024 * 1024)
387 
388 /**
389  *fbExporterOpenSocket
390  *
391  * @param exporter
392  * @param err
393  *
394  * @return
395  */
fbExporterOpenSocket(fbExporter_t * exporter,GError ** err)396 static gboolean fbExporterOpenSocket(
397     fbExporter_t                *exporter,
398     GError                      **err)
399 {
400     struct addrinfo             *ai = NULL;
401     int                         sockbuf_sz = FB_SOCKBUF_DEFAULT;
402 
403     /* Turn the exporter connection specifier into an addrinfo */
404     if (!fbConnSpecLookupAI(exporter->spec.conn, FALSE, err)) return FALSE;
405     ai = (struct addrinfo *)exporter->spec.conn->vai;
406 
407     /* ignore sigpipe if we're doing TCP or SCTP export */
408     if ((exporter->spec.conn->transport == FB_TCP) ||
409         (exporter->spec.conn->transport == FB_TLS_TCP)
410 #if FB_ENABLE_SCTP
411         || (exporter->spec.conn->transport == FB_SCTP)
412         || (exporter->spec.conn->transport == FB_DTLS_SCTP)
413 #endif
414     ) {
415         fbExporterIgnoreSigpipe();
416     }
417 
418     /* open socket of appropriate type for connection specifier */
419     do {
420 #if FB_ENABLE_SCTP
421         if ((exporter->spec.conn->transport == FB_SCTP) ||
422             (exporter->spec.conn->transport == FB_DTLS_SCTP))
423         {
424                 /* Kludge for SCTP. addrinfo doesn't accept SCTP hints. */
425                 ai->ai_socktype = SOCK_STREAM;
426                 ai->ai_protocol = IPPROTO_SCTP;
427         }
428 #endif
429 
430         exporter->stream.fd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
431         if (exporter->stream.fd < 0) continue;
432         if (connect(exporter->stream.fd, ai->ai_addr, ai->ai_addrlen) == 0) break;
433         close(exporter->stream.fd);
434     } while ((ai = ai->ai_next));
435 
436     /* check for no openable socket */
437     if (ai == NULL) {
438         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
439                     "couldn't create connected TCP socket to %s:%s %s",
440                     exporter->spec.conn->host, exporter->spec.conn->svc,
441                     strerror(errno));
442         return FALSE;
443     }
444 
445     /* increase send buffer size for UDP */
446     if ((exporter->spec.conn->transport == FB_UDP) ||
447         (exporter->spec.conn->transport == FB_DTLS_UDP))
448     {
449         if (!fbExporterMaxSendbuf(exporter->stream.fd, &sockbuf_sz)) {
450             g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
451                         "couldn't set socket buffer size on %s: %s",
452                         exporter->spec.conn->host, strerror(errno));
453             close(exporter->stream.fd);
454             return FALSE;
455         }
456     }
457 
458     /* set active flag */
459     exporter->active = TRUE;
460 
461     return TRUE;
462 }
463 
464 #if FB_ENABLE_SCTP
465 
466 /**
467  *fbExporterWriteSCTP
468  *
469  *
470  * @param exporter
471  * @param msgbase
472  * @param msglen
473  * @param err
474  *
475  * @return
476  */
fbExporterWriteSCTP(fbExporter_t * exporter,uint8_t * msgbase,size_t msglen,GError ** err)477 static gboolean fbExporterWriteSCTP(
478     fbExporter_t                *exporter,
479     uint8_t                     *msgbase,
480     size_t                      msglen,
481     GError                      **err)
482 {
483     ssize_t                     rc;
484     uint16_t                    initial_setid;
485     gboolean                    is_template;
486     int                         sctp_flags = 0;
487     uint32_t                    sctp_ttl = 0;
488 
489     /* Check to see if this is a template message */
490     initial_setid = *(uint16_t *)(msgbase + 16);
491     if (initial_setid == FB_TID_TS || initial_setid == FB_TID_OTS) {
492         is_template = TRUE;
493     } else {
494         is_template = FALSE;
495     }
496 
497     /* Do automatic stream selection if requested. */
498     if (exporter->sctp_mode & FB_F_SCTP_AUTOSTREAM) {
499         if (is_template) {
500             exporter->sctp_stream = 0;
501         } else {
502             exporter->sctp_stream = 1;
503         }
504     }
505 
506     /* Use partial reliability if requested for non-template messages */
507     if (!is_template && (exporter->sctp_mode & FB_F_SCTP_PR_TTL)) {
508         sctp_flags |= FB_F_SCTP_PR_TTL;
509         sctp_ttl = exporter->sctp_pr_param;
510     }
511 
512     rc = sctp_sendmsg(exporter->stream.fd, msgbase, msglen,
513                       NULL, 0,      /* destination sockaddr */
514                       0,            /* payload protocol */
515                       sctp_flags,   /* flags */
516                       exporter->sctp_stream,  /* stream */
517                       sctp_ttl,     /* message lifetime (ms) */
518                       0);           /* context */
519 
520     if (rc == (ssize_t)msglen) {
521         return TRUE;
522     } else if (rc == -1) {
523         if (errno == EPIPE) {
524             g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_NLWRITE,
525                         "Connection reset (EPIPE) on SCTP write");
526         } else {
527             g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
528                         "I/O error: %s", strerror(errno));
529         }
530         return FALSE;
531     } else {
532         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
533                     "short write: wrote %d while writing %lu",
534                     (int)rc, msglen);
535         return FALSE;
536     }
537 }
538 
539 #endif /* FB_ENABLE_SCTP */
540 
541 
542 /**
543  *fbExporterWriteTCP
544  *
545  *
546  * @param exporter
547  * @param msgbase
548  * @param msglen
549  * @param err
550  *
551  * @return
552  */
fbExporterWriteTCP(fbExporter_t * exporter,uint8_t * msgbase,size_t msglen,GError ** err)553 static gboolean fbExporterWriteTCP(
554     fbExporter_t                *exporter,
555     uint8_t                     *msgbase,
556     size_t                      msglen,
557     GError                      **err)
558 {
559     ssize_t                     rc;
560 
561     rc = write(exporter->stream.fd, msgbase, msglen);
562     if (rc == (ssize_t)msglen) {
563         return TRUE;
564     } else if (rc == -1) {
565         if (errno == EPIPE) {
566             g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_NLWRITE,
567                         "Connection reset (EPIPE) on TCP write");
568         } else {
569             g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
570                         "I/O error: %s", strerror(errno));
571         }
572         return FALSE;
573     } else {
574         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
575                     "short write: wrote %d while writing %u",
576                     (int)rc, (uint32_t)msglen);
577         return FALSE;
578     }
579 }
580 
581 /**
582  * fbExporterWriteUDP
583  *
584  *
585  * @param exporter
586  * @param msgbase
587  * @param msglen
588  * @param err
589  *
590  * @return
591  */
fbExporterWriteUDP(fbExporter_t * exporter,uint8_t * msgbase,size_t msglen,GError ** err)592 static gboolean fbExporterWriteUDP(
593     fbExporter_t                *exporter,
594     uint8_t                     *msgbase,
595     size_t                      msglen,
596     GError                      **err)
597 {
598     static gboolean             sendGood = TRUE;
599     ssize_t                     rc;
600 
601     /* Send the buffer as a single message */
602     rc = send(exporter->stream.fd, msgbase, msglen, 0);
603 
604     /* Deal with the results */
605     if (rc == (ssize_t)msglen) {
606         return TRUE;
607     } else if (rc == -1) {
608         if (TRUE == sendGood) {
609             g_warning( "I/O error on UDP send: %s (socket closed on receiver?)",
610                 strerror(errno));
611             g_warning("packets will be lost");
612             send(exporter->stream.fd, msgbase, msglen, 0);
613             sendGood = FALSE;
614             return TRUE;
615         }
616         return TRUE;
617     } else {
618         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
619                     "Short write on UDP send: wrote %d while writing %u",
620                     (int)rc, (uint32_t)msglen);
621         return FALSE;
622     }
623 }
624 
625 /**
626  *fbExporterCloseSocket
627  *
628  *
629  * @param exporter
630  *
631  */
fbExporterCloseSocket(fbExporter_t * exporter)632 static void fbExporterCloseSocket(
633     fbExporter_t                *exporter)
634 {
635     close(exporter->stream.fd);
636     exporter->active = FALSE;
637 }
638 
639 #if HAVE_OPENSSL
640 
641 /**
642  *fbExporterOpenTLS
643  *
644  *
645  * @param exporter
646  * @param err
647  *
648  * @return
649  */
fbExporterOpenTLS(fbExporter_t * exporter,GError ** err)650 static gboolean fbExporterOpenTLS(
651     fbExporter_t                *exporter,
652     GError                      **err)
653 {
654     BIO                         *conn = NULL;
655     gboolean                    ok = TRUE;
656 
657     /* Initialize SSL context if necessary */
658     if (!exporter->spec.conn->vssl_ctx) {
659         if (!fbConnSpecInitTLS(exporter->spec.conn, FALSE, err)) {
660             return FALSE;
661         }
662     }
663 
664     /* open underlying socket */
665     if (!fbExporterOpenSocket(exporter, err)) return FALSE;
666 
667     /* wrap a stream BIO around the opened socket */
668     if (!(conn = BIO_new_socket(exporter->stream.fd, 1))) {
669         ok = FALSE;
670         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
671                     "couldn't wrap socket to %s:%s for TLS: %s",
672                     exporter->spec.conn->host, exporter->spec.conn->svc,
673                     ERR_error_string(ERR_get_error(), NULL));
674         while (ERR_get_error());
675         goto end;
676     }
677 
678     /* create SSL socket */
679     if (!(exporter->ssl = SSL_new((SSL_CTX *)exporter->spec.conn->vssl_ctx))) {
680         ok = FALSE;
681         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
682                     "couldnt create TLS socket: %s",
683                     ERR_error_string(ERR_get_error(), NULL));
684         while (ERR_get_error());
685         goto end;
686     }
687 
688     /* connect to it */
689     SSL_set_bio(exporter->ssl, conn, conn);
690     if (SSL_connect(exporter->ssl) <= 0) {
691         ok = FALSE;
692         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
693                     "couldn't connect TLS socket to %s:%s: %s",
694                     exporter->spec.conn->host, exporter->spec.conn->svc,
695                     ERR_error_string(ERR_get_error(), NULL));
696         while (ERR_get_error());
697         goto end;
698     }
699 
700     /* FIXME do post-connection verification */
701 
702 end:
703     if (!ok) {
704         exporter->active = FALSE;
705         if (exporter->ssl) {
706             SSL_free(exporter->ssl);
707             exporter->ssl = NULL;
708         } else if (conn) {
709             BIO_vfree(conn);
710         }
711     }
712     return ok;
713 }
714 
715 #if HAVE_OPENSSL_DTLS
716 
717 /**
718  *fbExporterOpenDTLS
719  *
720  * @param exporter
721  * @param err
722  *
723  * @return
724  *
725  */
fbExporterOpenDTLS(fbExporter_t * exporter,GError ** err)726 static gboolean fbExporterOpenDTLS(
727     fbExporter_t                *exporter,
728     GError                      **err)
729 {
730     BIO                         *conn = NULL;
731     gboolean                    ok = TRUE;
732     struct sockaddr             peer;
733     size_t                      peerlen = sizeof(struct sockaddr);
734 
735     /* Initialize SSL context if necessary */
736     if (!exporter->spec.conn->vssl_ctx) {
737         if (!fbConnSpecInitTLS(exporter->spec.conn, FALSE, err)) {
738             return FALSE;
739         }
740     }
741 
742     /* open underlying socket */
743     if (!fbExporterOpenSocket(exporter, err)) return FALSE;
744 
745     /* wrap a datagram BIO around the opened socket */
746     if (!(conn = BIO_new_dgram(exporter->stream.fd, 1))) {
747         ok = FALSE;
748         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
749                     "couldn't wrap socket to %s:%s for DTLS: %s",
750                     exporter->spec.conn->host, exporter->spec.conn->svc,
751                     ERR_error_string(ERR_get_error(), NULL));
752         while (ERR_get_error());
753         goto end;
754     }
755 
756     /* Tell dgram bio what its name is */
757     if (getsockname(exporter->stream.fd, &peer, (socklen_t *)&peerlen) < 0) {
758         ok = FALSE;
759         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
760                     "couldn't wrap socket to %s:%s for DTLS: %s",
761                     exporter->spec.conn->host, exporter->spec.conn->svc,
762                     strerror(errno));
763         goto end;
764     }
765     BIO_ctrl_set_connected(conn, 1, &peer);
766 
767     /* create SSL socket */
768     if (!(exporter->ssl = SSL_new((SSL_CTX *)exporter->spec.conn->vssl_ctx))) {
769         ok = FALSE;
770         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
771                     "couldnt create DTLS socket: %s",
772                     ERR_error_string(ERR_get_error(), NULL));
773         while (ERR_get_error());
774         goto end;
775     }
776 
777     /* connect to it */
778     SSL_set_bio(exporter->ssl, conn, conn);
779     SSL_set_connect_state(exporter->ssl);
780 
781     /* FIXME do post-connection verification */
782 
783 end:
784     if (!ok) {
785         exporter->active = FALSE;
786         if (exporter->ssl) {
787             SSL_free(exporter->ssl);
788             exporter->ssl = NULL;
789         } else if (conn) {
790             BIO_vfree(conn);
791         }
792     }
793     return ok;
794 }
795 
796 #endif /* HAVE_OPENSSL_DTLS */
797 
798 /**
799  *fbExporterWriteTLS
800  *
801  *
802  * @param exporter
803  * @param msgbase
804  * @param msglen
805  * @param err
806  *
807  * @return
808  */
fbExporterWriteTLS(fbExporter_t * exporter,uint8_t * msgbase,size_t msglen,GError ** err)809 static gboolean fbExporterWriteTLS(
810     fbExporter_t                *exporter,
811     uint8_t                     *msgbase,
812     size_t                      msglen,
813     GError                      **err)
814 {
815     int                         rc;
816 
817     while (msglen) {
818         rc = SSL_write(exporter->ssl, msgbase, msglen);
819         if (rc <= 0) {
820             g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
821                         "I/O error: %s",
822                         ERR_error_string(ERR_get_error(), NULL));
823             while (ERR_get_error());
824             return FALSE;
825         }
826 
827         /* we sent some bytes - advance pointers */
828         msglen -= rc;
829         msgbase += rc;
830     }
831 
832     return TRUE;
833 }
834 
835 /**
836  *fbExporterCloseTLS
837  *
838  *
839  * @param exporter
840  *
841  */
fbExporterCloseTLS(fbExporter_t * exporter)842 static void fbExporterCloseTLS(
843     fbExporter_t                *exporter)
844 {
845     SSL_shutdown(exporter->ssl);
846     SSL_free(exporter->ssl);
847     exporter->active = FALSE;
848 }
849 
850 #endif /* HAVE_OPENSSL */
851 
852 /**
853  *fbExporterGetMTU
854  *
855  * @param exporter
856  *
857  * @return
858  */
fbExporterGetMTU(fbExporter_t * exporter)859 uint16_t        fbExporterGetMTU(
860     fbExporter_t                *exporter)
861 {
862     return exporter->mtu;
863 }
864 
865 /**
866  *fbExporterAllocNet
867  *
868  * @param spec
869  *
870  * @return
871  */
fbExporterAllocNet(fbConnSpec_t * spec)872 fbExporter_t    *fbExporterAllocNet(
873     fbConnSpec_t    *spec)
874 {
875     fbExporter_t    *exporter = NULL;
876 
877     /* Host must not be null */
878     g_assert(spec->host);
879 
880     /* Create a new exporter */
881     exporter = g_slice_new0(fbExporter_t);
882 
883     /* Copy the connection specifier */
884     exporter->spec.conn = fbConnSpecCopy(spec);
885 
886     /* Set up functions */
887     switch (spec->transport) {
888 #if FB_ENABLE_SCTP
889     case FB_SCTP:
890         exporter->exopen = fbExporterOpenSocket;
891         exporter->exwrite = fbExporterWriteSCTP;
892         exporter->exclose = fbExporterCloseSocket;
893         exporter->sctp_mode = FB_F_SCTP_AUTOSTREAM;
894         exporter->sctp_stream = 0;
895         exporter->mtu = 8192;
896         break;
897 #endif
898     case FB_TCP:
899         exporter->exopen = fbExporterOpenSocket;
900         exporter->exwrite = fbExporterWriteTCP;
901         exporter->exclose = fbExporterCloseSocket;
902         exporter->mtu = 8192;
903         break;
904     case FB_UDP:
905         exporter->exopen = fbExporterOpenSocket;
906         exporter->exwrite = fbExporterWriteUDP;
907         exporter->exclose = fbExporterCloseSocket;
908         exporter->mtu = 1420;
909         break;
910 #if HAVE_OPENSSL
911 #if HAVE_OPENSSL_DTLS_SCTP
912     case FB_DTLS_SCTP:
913         exporter->exopen = fbExporterOpenDTLS;
914         exporter->exwrite = fbExporterWriteTLS;
915         exporter->exclose = fbExporterCloseTLS;
916         exporter->sctp_mode = FB_F_SCTP_AUTOSTREAM;
917         exporter->sctp_stream = 0;
918         exporter->mtu = 8192;
919         break;
920 #endif
921     case FB_TLS_TCP:
922         exporter->exopen = fbExporterOpenTLS;
923         exporter->exwrite = fbExporterWriteTLS;
924         exporter->exclose = fbExporterCloseTLS;
925         exporter->mtu = 8192;
926         break;
927 #if HAVE_OPENSSL_DTLS
928     case FB_DTLS_UDP:
929         exporter->exopen = fbExporterOpenDTLS;
930         exporter->exwrite = fbExporterWriteTLS;
931         exporter->exclose = fbExporterCloseTLS;
932         exporter->mtu = 1320;
933         break;
934 #endif
935 #endif
936     default:
937 #ifndef FB_ENABLE_SCTP
938         if (spec->transport == FB_SCTP || spec->transport == FB_DTLS_SCTP) {
939             g_error("Libfixbuf not enabled for SCTP Transport. "
940                     " Run configure with --with-sctp");
941         }
942 #endif
943         if (spec->transport == FB_TLS_TCP || spec->transport == FB_DTLS_SCTP ||
944             spec->transport == FB_DTLS_UDP) {
945             g_error("Libfixbuf not enabled for this mode of transport. "
946                     " Run configure with --with-openssl");
947         }
948     }
949 
950     /* Return new exporter */
951     return exporter;
952 }
953 
954 #if HAVE_SPREAD
955 
956 /**
957  * fbExporterSpreadReceiver
958  *
959  */
960 
fbExporterSpreadReceiver(void * arg)961 static void * fbExporterSpreadReceiver(
962     void *arg)
963 {
964     int             i = 0;
965     char            grp[MAX_GROUP_NAME];
966     fbExporter_t    *exporter = (fbExporter_t *)arg;
967     fbSpreadSpec_t  *spread = exporter->spec.spread;
968     service         service_type = 0;
969     char            sender[MAX_GROUP_NAME];
970     int             num_groups = 0;
971     int16           mess_type = 0;
972     int             endian_mismatch;
973     int             run = 1;
974     int             ret;
975     membership_info memb_info;
976 
977 #if 0
978     /*  this loop is to allow a debugger to be attached */
979     int foo = 1;
980     do {
981         sleep(1);
982     } while (foo);
983 #endif
984 
985     ret = SP_connect(spread->daemon, 0, 0, 1,
986                      &(spread->recv_mbox), spread->recv_privgroup);
987 
988     if (ret != ACCEPT_SESSION) {
989 
990         g_set_error(&(spread->recv_err), FB_ERROR_DOMAIN, FB_ERROR_CONN,
991                     "error connecting to Spread daemon %s: %s",
992                     spread->daemon, fbConnSpreadError(ret));
993         return 0;
994     }
995 
996     for (i = 0; i < spread->num_groups; i++) {
997          /* exporter listener only joins template/control plane group,
998          the group name is always the name of the data plane group
999          plus 'T' added to the end. */
1000         memset(grp, 0, sizeof(grp));
1001         strncpy(grp, spread->groups[i].name, sizeof(grp) - 2);
1002         strcat(grp, "T");
1003         ret = SP_join(spread->recv_mbox, grp);
1004         if (ret) {
1005             g_set_error(&(spread->recv_err), FB_ERROR_DOMAIN, FB_ERROR_CONN,
1006                         "error joining to Spread group %s: %s",
1007                         spread->groups[i].name, fbConnSpreadError(ret));
1008             return 0;
1009         }
1010     }
1011 
1012     do {
1013         ret = SP_receive(spread->recv_mbox, &service_type, sender,
1014                          spread->recv_max_groups, &num_groups,
1015                          (char (*)[])spread->recv_groups,
1016                          &mess_type, &endian_mismatch, spread->recv_max,
1017                          spread->recv_mess);
1018 
1019         if (spread->recv_exit) {
1020             SP_disconnect(spread->recv_mbox);
1021             continue;
1022         }
1023 
1024         if (ret < 0) {
1025 
1026             if (ret == GROUPS_TOO_SHORT) {
1027                 g_free(spread->recv_groups);
1028                 spread->recv_max_groups = -ret;
1029                 spread->recv_groups = g_new0(sp_groupname_t,
1030                                              spread->recv_max_groups);
1031             } else if (ret == BUFFER_TOO_SHORT) {
1032                 g_free(spread->recv_mess);
1033                 spread->recv_max = -endian_mismatch;
1034                 spread->recv_mess = g_new0(char, spread->recv_max);
1035             } else {
1036                 g_set_error(&(spread->recv_err), FB_ERROR_DOMAIN, FB_ERROR_IO,
1037                             "error receiving Spread message: %s",
1038                             fbConnSpreadError(ret));
1039 
1040                 SP_disconnect(spread->recv_mbox);
1041 
1042                 run = 0;
1043             }
1044             continue;
1045         }
1046         /* actually received a message! Now process it */
1047         if (!Is_reg_memb_mess(service_type) ||
1048             !Is_caused_join_mess(service_type))
1049             continue;
1050 
1051         if (SP_get_memb_info(spread->recv_mess, service_type, &memb_info) < 0)
1052             continue;
1053         if (!memb_info.changed_member[0])
1054             continue;
1055         if (strncmp(memb_info.changed_member, spread->recv_privgroup,
1056                     MAX_GROUP_NAME) == 0)
1057         {
1058             continue;
1059         }
1060 
1061         /** Send Relevant Templates to New Member only. */
1062         /** memb_info.changed_member is private group name */
1063         /** sender is group they are subscribing to */
1064         fbSessionSetPrivateGroup(spread->session,
1065                                  sender, memb_info.changed_member);
1066 
1067     } while (run);
1068 
1069     return 0;
1070 }
1071 
1072 /**
1073  * fbExporterSpreadOpen
1074  *
1075  * @param exporter
1076  * @param err
1077  */
fbExporterSpreadOpen(fbExporter_t * exporter,GError ** err)1078 static gboolean fbExporterSpreadOpen(
1079      fbExporter_t *exporter,
1080      GError **err )
1081 {
1082     int ret;
1083 
1084     if (!exporter->spec.spread->daemon) {
1085         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1086                     "Spread daemon name cannot be null");
1087         return FALSE;
1088     }
1089     if (!exporter->spec.spread->daemon[0]) {
1090         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1091                     "Spread daemon name cannot be empty");
1092         return FALSE;
1093     }
1094 
1095     if (!(memchr(exporter->spec.spread->daemon, 0, 261))) {
1096         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1097                     "Spread daemon name too long");
1098         return FALSE;
1099     }
1100     if (!exporter->spec.spread->groups) {
1101         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1102                     "Spread groups cannot be null");
1103         return FALSE;
1104     }
1105     if (!exporter->spec.spread->groups[0].name[0]) {
1106         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1107                     "Spread groups cannot be empty");
1108         return FALSE;
1109     }
1110     if (!exporter->spec.spread->session) {
1111         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1112                     "Spread session cannot be null");
1113         return FALSE;
1114     }
1115 
1116     pthread_mutex_init(&(exporter->spec.spread->write_lock), 0);
1117 
1118     ret = SP_connect(exporter->spec.spread->daemon, 0, 0, 0,
1119                      &(exporter->spec.spread->mbox),
1120                      exporter->spec.spread->privgroup);
1121     if (ret != ACCEPT_SESSION) {
1122         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1123                     "error connecting to Spread daemon %s: %s",
1124                     exporter->spec.spread->daemon, fbConnSpreadError(ret));
1125         return FALSE;
1126     }
1127 
1128     exporter->spec.spread->recv_err = 0;
1129     exporter->spec.spread->recv_exit = 0;
1130     ret = pthread_create(&(exporter->spec.spread->recv_thread), NULL,
1131                          fbExporterSpreadReceiver, exporter );
1132     if (ret) {
1133         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1134                     "error creating Spread receiver thread: %s",strerror(ret));
1135         SP_disconnect(exporter->spec.spread->mbox);
1136         return FALSE;
1137     }
1138 
1139     /*pthread_detach(exporter->spec.spread->recv_thread);*/
1140 
1141     exporter->active = TRUE;
1142     return TRUE;
1143 }
1144 
1145 /**
1146  * fbExporterSpreadWrite
1147  *
1148  * @param exporter
1149  * @param msg
1150  * @param msg size
1151  * @param err
1152  */
fbExporterSpreadWrite(fbExporter_t * exporter,uint8_t * msgbase,size_t msglen,GError ** err)1153 static gboolean fbExporterSpreadWrite(
1154     fbExporter_t  *exporter,
1155     uint8_t       *msgbase,
1156     size_t         msglen,
1157     GError       **err)
1158 {
1159     int ret = 0;
1160     fbSpreadSpec_t *spread;
1161 
1162     pthread_mutex_lock(&(exporter->spec.spread->write_lock));
1163 
1164     spread = exporter->spec.spread;
1165 
1166     if (spread->num_groups_to_send == 1) {
1167         ret = SP_multicast(spread->mbox, RELIABLE_MESS,
1168                            spread->groups_to_send[0].name,
1169                            0, msglen, (const char *)msgbase);
1170     } else if (spread->num_groups == 1) {
1171         ret = SP_multicast(spread->mbox, RELIABLE_MESS,
1172                            spread->groups[0].name,
1173                            0, msglen, (const char *)msgbase);
1174     } else if (spread->num_groups_to_send > 1) {
1175         ret = SP_multigroup_multicast(spread->mbox, RELIABLE_MESS,
1176                                       spread->num_groups_to_send,
1177                                       (const char (*)[])spread->groups_to_send,
1178                                       0, msglen, (const char *)msgbase);
1179 
1180     } else {
1181         ret = SP_multigroup_multicast(spread->mbox, RELIABLE_MESS,
1182                                       spread->num_groups,
1183                                       (const char (*)[])spread->groups,
1184                                       0, msglen, (const char *)msgbase);
1185     }
1186 
1187     if (ret < 0) {
1188         g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1189                     "error receiving Spread message: %s",
1190                     fbConnSpreadError(ret));
1191     }
1192 
1193     pthread_mutex_unlock(&(exporter->spec.spread->write_lock));
1194 
1195     return (ret < 0) ? FALSE : TRUE;
1196 }
1197 
1198 /**
1199  * fbExporterSpreadClose
1200  *
1201  * @param exporter
1202  */
1203 
fbExporterSpreadClose(fbExporter_t * exporter)1204 static void fbExporterSpreadClose(
1205     fbExporter_t *exporter)
1206 {
1207     if (exporter->active) {
1208         pthread_cancel(exporter->spec.spread->recv_thread);
1209         pthread_join(exporter->spec.spread->recv_thread, NULL);
1210         SP_disconnect(exporter->spec.spread->mbox);
1211     }
1212     exporter->active = FALSE;
1213 }
1214 
1215 /**
1216  * fbExporterAllocSpread
1217  *
1218  * @param Spread_params
1219  */
fbExporterAllocSpread(fbSpreadParams_t * params)1220 fbExporter_t *fbExporterAllocSpread(
1221     fbSpreadParams_t *params)
1222 {
1223     fbExporter_t    *exporter = NULL;
1224 
1225     g_assert(params->daemon);
1226     g_assert(params->groups);
1227     g_assert(params->groups[0]);
1228 
1229     exporter = g_slice_new0(fbExporter_t);
1230 
1231     exporter->spec.spread = fbConnSpreadCopy(params);
1232 
1233     exporter->exopen = fbExporterSpreadOpen;
1234     exporter->exwrite = fbExporterSpreadWrite;
1235     exporter->exclose = fbExporterSpreadClose;
1236 #ifdef DEBUG
1237     exporter->mtu = 8192;
1238 #else
1239     exporter->mtu = FB_SPREAD_MTU;
1240 #endif
1241 
1242     return exporter;
1243 }
1244 
1245 /**
1246  * fbExporterSetGroupsToSend
1247  *
1248  * @param exporter
1249  * @param groups
1250  * @param num_groups
1251  *
1252  */
fbExporterSetGroupsToSend(fbExporter_t * exporter,char ** groups,int num_groups)1253 void fbExporterSetGroupsToSend(
1254     fbExporter_t      *exporter,
1255     char              **groups,
1256     int               num_groups)
1257 {
1258     int n = 0;
1259     char **g = 0;
1260     fbSpreadSpec_t   *spread = exporter->spec.spread;
1261 
1262     if (!spread->groups_to_send) {
1263         spread->groups_to_send = g_new0(sp_groupname_t, spread->num_groups);
1264     }
1265 
1266     g = groups;
1267 
1268     for (n=0; n < num_groups; n++) {
1269         strncpy(spread->groups_to_send[n].name, *g, MAX_GROUP_NAME-1);
1270         g++;
1271     }
1272 
1273     spread->num_groups_to_send = n;
1274 }
1275 
1276 /**
1277  * fbExporterCheckGroups
1278  *
1279  * If we are sending to a new (different) group of Spread Groups - return TRUE
1280  * to emit the buffer and set new export groups.
1281  *
1282  * @param exporter
1283  * @param groups
1284  * @param num_groups
1285  */
fbExporterCheckGroups(fbExporter_t * exporter,char ** groups,int num_groups)1286 gboolean fbExporterCheckGroups(
1287     fbExporter_t *exporter,
1288     char **groups,
1289     int    num_groups)
1290 {
1291     int n;
1292     fbSpreadSpec_t *spread = exporter->spec.spread;
1293 
1294     if (num_groups != spread->num_groups_to_send) {
1295         return TRUE;
1296     }
1297 
1298     for (n = 0; n < num_groups; n++) {
1299         if (strcmp(spread->groups_to_send[n].name, groups[n]) != 0) {
1300             return TRUE;
1301         }
1302     }
1303 
1304     return FALSE;
1305 
1306 }
1307 
1308 #endif /* HAVE_SPREAD */
1309 
1310 /**
1311  *fbExporterSetStream
1312  *
1313  * @param exporter
1314  * @param sctp_stream
1315  *
1316  */
fbExporterSetStream(fbExporter_t * exporter,int sctp_stream)1317 void fbExporterSetStream(
1318     fbExporter_t        *exporter,
1319     int                 sctp_stream)
1320 {
1321     exporter->sctp_mode &= ~FB_F_SCTP_AUTOSTREAM;
1322     exporter->sctp_stream = sctp_stream;
1323 }
1324 
1325 /**
1326  *fbExporterAutoStream
1327  *
1328  * @param exporter
1329  *
1330  *
1331  */
fbExporterAutoStream(fbExporter_t * exporter)1332 void fbExporterAutoStream(
1333     fbExporter_t        *exporter)
1334 {
1335     exporter->sctp_mode |= FB_F_SCTP_AUTOSTREAM;
1336 }
1337 
1338 #if 0
1339 /**
1340  *fbExporterSetPRTTL
1341  *
1342  * @param exporter
1343  * @param pr_ttl
1344  *
1345  *
1346  */
1347 void fbExporterSetPRTTL(
1348     fbExporter_t        *exporter,
1349     int                 pr_ttl)
1350 {
1351     if (pr_ttl > 0) {
1352         exporter->sctp_mode |= FB_F_SCTP_PR_TTL;
1353         exporter->sctp_pr_param = pr_ttl;
1354     } else {
1355         exporter->sctp_mode &= ~FB_F_SCTP_PR_TTL;
1356         exporter->sctp_pr_param = 0;
1357     }
1358 }
1359 #endif  /* 0 */
1360 
1361 /**
1362  *fbExportMessage
1363  *
1364  *
1365  * @param exporter
1366  * @param msgbase
1367  * @param msglen
1368  * @param err
1369  *
1370  * @return
1371  *
1372  */
fbExportMessage(fbExporter_t * exporter,uint8_t * msgbase,size_t msglen,GError ** err)1373 gboolean        fbExportMessage(
1374     fbExporter_t    *exporter,
1375     uint8_t         *msgbase,
1376     size_t          msglen,
1377     GError          **err)
1378 {
1379     /* Ensure stream is open */
1380     if (!exporter->active) {
1381         g_assert(exporter->exopen);
1382         if (!exporter->exopen(exporter, err)) return FALSE;
1383     }
1384 
1385     /* Attempt to write message */
1386     if (exporter->exwrite(exporter, msgbase, msglen, err)) return TRUE;
1387 
1388     /* Close exporter on write failure */
1389     if (exporter->exclose) exporter->exclose(exporter);
1390     return FALSE;
1391 }
1392 
1393 /**
1394  *fbExporterFree
1395  *
1396  *
1397  * @param exporter
1398  *
1399  */
fbExporterFree(fbExporter_t * exporter)1400 void                fbExporterFree(
1401     fbExporter_t       *exporter)
1402 {
1403     fbExporterClose(exporter);
1404     if (exporter->exwrite == fbExporterWriteFile)
1405     {
1406         g_free(exporter->spec.path);
1407     }
1408 #ifdef HAVE_SPREAD
1409     else if (exporter->exwrite == fbExporterSpreadWrite)
1410     {
1411         fbConnSpreadFree( exporter->spec.spread );
1412     }
1413 #endif
1414     else {
1415         fbConnSpecFree(exporter->spec.conn);
1416     }
1417 
1418     g_slice_free(fbExporter_t, exporter);
1419 }
1420 
1421 /**
1422  *fbExporterClose
1423  *
1424  *
1425  *
1426  * @param exporter
1427  */
fbExporterClose(fbExporter_t * exporter)1428 void fbExporterClose(
1429     fbExporter_t    *exporter)
1430 {
1431     if (exporter->active && exporter->exclose) exporter->exclose(exporter);
1432 }
1433 
1434 /**
1435  * fbExporterGetMsgLen
1436  *
1437  *
1438  * @param exporter
1439  */
1440 
fbExporterGetMsgLen(fbExporter_t * exporter)1441 size_t fbExporterGetMsgLen(
1442     fbExporter_t   *exporter)
1443 {
1444     return exporter->msg_len;
1445 }
1446