1 /**
2 * @internal
3 *
4 * @file fbexporter.c
5 *
6 * IPFIX Exporting Process single transport session implementation
7 *
8 ** ------------------------------------------------------------------------
9 ** Copyright (C) 2006-2019 Carnegie Mellon University. All Rights Reserved.
10 ** ------------------------------------------------------------------------
11 ** Authors: Brian Trammell
12 ** ------------------------------------------------------------------------
13 ** @OPENSOURCE_LICENSE_START@
14 ** libfixbuf 2.0
15 **
16 ** Copyright 2018-2019 Carnegie Mellon University. All Rights Reserved.
17 **
18 ** NO WARRANTY. THIS CARNEGIE MELLON UNIVERSITY AND SOFTWARE
19 ** ENGINEERING INSTITUTE MATERIAL IS FURNISHED ON AN "AS-IS"
20 ** BASIS. CARNEGIE MELLON UNIVERSITY MAKES NO WARRANTIES OF ANY KIND,
21 ** EITHER EXPRESSED OR IMPLIED, AS TO ANY MATTER INCLUDING, BUT NOT
22 ** LIMITED TO, WARRANTY OF FITNESS FOR PURPOSE OR MERCHANTABILITY,
23 ** EXCLUSIVITY, OR RESULTS OBTAINED FROM USE OF THE
24 ** MATERIAL. CARNEGIE MELLON UNIVERSITY DOES NOT MAKE ANY WARRANTY OF
25 ** ANY KIND WITH RESPECT TO FREEDOM FROM PATENT, TRADEMARK, OR
26 ** COPYRIGHT INFRINGEMENT.
27 **
28 ** Released under a GNU-Lesser GPL 3.0-style license, please see
29 ** LICENSE.txt or contact permission@sei.cmu.edu for full terms.
30 **
31 ** [DISTRIBUTION STATEMENT A] This material has been approved for
32 ** public release and unlimited distribution. Please see Copyright
33 ** notice for non-US Government use and distribution.
34 **
35 ** Carnegie Mellon(R) and CERT(R) are registered in the U.S. Patent
36 ** and Trademark Office by Carnegie Mellon University.
37 **
38 ** DM18-0325
39 ** @OPENSOURCE_LICENSE_END@
40 ** ------------------------------------------------------------------------
41 *
42 */
43
44 #define _FIXBUF_SOURCE_
45 /*#define _GNU_SOURCE*/
46 #include <fixbuf/private.h>
47
48
49 /**
50 * If set in exporter SCTP mode, use simple automatic stream selection as
51 * specified in the IPFIX protocol without flexible stream selection: send
52 * templates on stream 0, and data on stream 1.
53 */
54 #define FB_F_SCTP_AUTOSTREAM 0x80000000
55
56 /**
57 * If set in exporter SCTP mode, use TTL-based partial reliability for
58 * non-template messages.
59 */
60 #define FB_F_SCTP_PR_TTL 0x40000000
61
62 typedef gboolean (*fbExporterOpen_fn)(
63 fbExporter_t *exporter,
64 GError **err);
65
66 typedef gboolean (*fbExporterWrite_fn)(
67 fbExporter_t *exporter,
68 uint8_t *msgbase,
69 size_t msglen,
70 GError **err);
71
72 typedef void (*fbExporterClose_fn)(
73 fbExporter_t *exporter);
74
75 struct fbExporter_st {
76 /** Specifier used for stream open */
77 union {
78 fbConnSpec_t *conn;
79 #if HAVE_SPREAD
80 fbSpreadSpec_t *spread;
81 #endif
82 char *path;
83 } spec;
84 /** Current export stream */
85 union {
86 /** Buffered file pointer, for file transport */
87 FILE *fp;
88 /** Buffer for data if providing own transport */
89 uint8_t *buffer;
90 /**
91 * Unbuffered socket, for SCTP, TCP, or UDP transport.
92 * Also used as base socket for TLS and DTLS support.
93 */
94 int fd;
95 } stream;
96 /** SCTP mode. Union of FB_SCTP_F_* flags. */
97 uint32_t sctp_mode;
98 /** Next SCTP stream */
99 int sctp_stream;
100 /** Partial reliability parameter (see mode) */
101 int sctp_pr_param;
102 #if HAVE_OPENSSL
103 /** OpenSSL socket, for TLS or DTLS over the socket in fd. */
104 SSL *ssl;
105 #endif
106 gboolean active;
107 size_t msg_len;
108 fbExporterOpen_fn exopen;
109 fbExporterWrite_fn exwrite;
110 fbExporterClose_fn exclose;
111 uint16_t mtu;
112 };
113
114 /**
115 *fbExporterOpenFile
116 *
117 *
118 * @param exporter
119 * @param err
120 *
121 * @return
122 */
fbExporterOpenFile(fbExporter_t * exporter,GError ** err)123 static gboolean fbExporterOpenFile(
124 fbExporter_t *exporter,
125 GError **err)
126 {
127 /* check to see if we're opening stdout */
128 if ((strlen(exporter->spec.path) == 1) &&
129 (exporter->spec.path[0] == '-'))
130 {
131 /* don't open a terminal */
132 if (isatty(fileno(stdout))) {
133 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
134 "Refusing to open stdout terminal for export");
135 return FALSE;
136 }
137
138 /* yep, stdout */
139 exporter->stream.fp = stdout;
140 } else {
141 /* nope, just a regular file */
142 exporter->stream.fp = fopen(exporter->spec.path, "w");
143 }
144
145 /* check for error */
146 if (!exporter->stream.fp) {
147 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
148 "Couldn't open %s for export: %s",
149 exporter->spec.path, strerror(errno));
150 return FALSE;
151 }
152
153 /* set active flag */
154 exporter->active = TRUE;
155
156 return TRUE;
157 }
158
159 /**
160 *fbExporterWriteFile
161 *
162 *
163 * @param exporter
164 * @param msgbase
165 * @param msglen
166 * @param err
167 *
168 * @return
169 */
fbExporterWriteFile(fbExporter_t * exporter,uint8_t * msgbase,size_t msglen,GError ** err)170 static gboolean fbExporterWriteFile(
171 fbExporter_t *exporter,
172 uint8_t *msgbase,
173 size_t msglen,
174 GError **err)
175 {
176 size_t rc;
177
178 rc = fwrite(msgbase, 1, msglen, exporter->stream.fp);
179
180 if (rc == msglen) {
181 return TRUE;
182 } else {
183 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
184 "Couldn't write %u bytes to %s: %s",
185 (uint32_t)msglen, exporter->spec.path, strerror(errno));
186 return FALSE;
187 }
188 }
189
190 /**
191 *fbExporterCloseFile
192 *
193 *
194 * @param exporter
195 *
196 */
fbExporterCloseFile(fbExporter_t * exporter)197 static void fbExporterCloseFile(
198 fbExporter_t *exporter)
199 {
200 if (exporter->stream.fp == stdout) {
201 fflush(exporter->stream.fp);
202 } else {
203 fclose(exporter->stream.fp);
204 }
205 exporter->stream.fp = NULL;
206 exporter->active = FALSE;
207 }
208
209 /**
210 *fbExporterAllocFile
211 *
212 *
213 * @param path
214 * @param exporter
215 *
216 * @return
217 */
fbExporterAllocFile(const char * path)218 fbExporter_t *fbExporterAllocFile(
219 const char *path)
220 {
221 fbExporter_t *exporter;
222
223 /* Create a new exporter */
224 exporter = g_slice_new0(fbExporter_t);
225
226 /* Copy the path */
227 exporter->spec.path = g_strdup(path);
228
229 /* Set up stream management functions */
230 exporter->exopen = fbExporterOpenFile;
231 exporter->exwrite = fbExporterWriteFile;
232 exporter->exclose = fbExporterCloseFile;
233 exporter->mtu = 65496;
234
235 return exporter;
236 }
237
238 /**
239 * fbExporterOpenBuffer
240 *
241 *
242 */
fbExporterOpenBuffer(fbExporter_t * exporter,GError ** err)243 static gboolean fbExporterOpenBuffer(
244 fbExporter_t *exporter,
245 GError **err)
246 {
247 (void)err;
248 /* set active flag */
249 exporter->active = TRUE;
250
251 return TRUE;
252 }
253
254 /**
255 *fbExporterCloseBuffer
256 *
257 *
258 * @param exporter
259 *
260 */
fbExporterCloseBuffer(fbExporter_t * exporter)261 static void fbExporterCloseBuffer(
262 fbExporter_t *exporter)
263 {
264 exporter->active = FALSE;
265 }
266
fbExporterWriteBuffer(fbExporter_t * exporter,uint8_t * msgbase,size_t msglen,GError ** err)267 static gboolean fbExporterWriteBuffer(
268 fbExporter_t *exporter,
269 uint8_t *msgbase,
270 size_t msglen,
271 GError **err)
272 {
273 (void)err;
274 memcpy(exporter->stream.buffer, msgbase, msglen);
275 exporter->msg_len = msglen;
276
277 return TRUE;
278
279 }
280
281 /**
282 * fbExporterAllocBuffer
283 *
284 *
285 *
286 */
fbExporterAllocBuffer(uint8_t * buf,uint16_t bufsize)287 fbExporter_t *fbExporterAllocBuffer(
288 uint8_t *buf,
289 uint16_t bufsize)
290 {
291 fbExporter_t *exporter = NULL;
292
293 exporter = g_slice_new0(fbExporter_t);
294 exporter->exwrite = fbExporterWriteBuffer;
295 exporter->exopen = fbExporterOpenBuffer;
296 exporter->exclose = fbExporterCloseBuffer;
297 exporter->mtu = bufsize;
298 exporter->stream.buffer = buf;
299
300 return exporter;
301 }
302
303 /**
304 *fbExporterAllocFP
305 *
306 * @param fp
307 *
308 * @return
309 */
fbExporterAllocFP(FILE * fp)310 fbExporter_t *fbExporterAllocFP(
311 FILE *fp)
312 {
313 fbExporter_t *exporter;
314
315 /* Create a new exporter */
316 exporter = g_slice_new0(fbExporter_t);
317
318 /* Reference the path */
319 exporter->spec.path = g_strdup("FP");
320
321 /* Set up stream management functions */
322 exporter->exwrite = fbExporterWriteFile;
323 exporter->mtu = 65496;
324
325 /* set active flag */
326 exporter->active = TRUE;
327
328 /* set file pointer */
329 exporter->stream.fp = fp;
330
331 return exporter;
332 }
333
334 /**
335 *fbExporterIgnoreSigpipe
336 *
337 *
338 */
fbExporterIgnoreSigpipe(void)339 static void fbExporterIgnoreSigpipe(
340 void)
341 {
342 static gboolean ignored = FALSE;
343 struct sigaction sa, osa;
344
345 if (ignored) return;
346
347 sa.sa_handler = SIG_IGN;
348 sigemptyset(&sa.sa_mask);
349 sa.sa_flags = SA_RESTART;
350 if (sigaction(SIGPIPE,&sa,&osa)) {
351 g_error("sigaction(SIGPIPE) failed: %s", strerror(errno));
352 }
353
354 ignored = TRUE;
355 }
356
357 /**
358 *fbExporterMaxSendbuf
359 *
360 *
361 * @param sock
362 * @param size
363 *
364 * @return
365 */
fbExporterMaxSendbuf(int sock,int * size)366 static gboolean fbExporterMaxSendbuf(
367 int sock,
368 int *size)
369 {
370 while (*size > 4096) {
371 if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, size,
372 sizeof(*size)) == 0)
373 {
374 return TRUE;
375 }
376 if (errno != ENOBUFS) {
377 return FALSE;
378 }
379 *size -= (*size > 1024 * 1024)
380 ? 1024 * 1024
381 : 2048;
382 }
383 return FALSE;
384 }
385
386 #define FB_SOCKBUF_DEFAULT (4 * 1024 * 1024)
387
388 /**
389 *fbExporterOpenSocket
390 *
391 * @param exporter
392 * @param err
393 *
394 * @return
395 */
fbExporterOpenSocket(fbExporter_t * exporter,GError ** err)396 static gboolean fbExporterOpenSocket(
397 fbExporter_t *exporter,
398 GError **err)
399 {
400 struct addrinfo *ai = NULL;
401 int sockbuf_sz = FB_SOCKBUF_DEFAULT;
402
403 /* Turn the exporter connection specifier into an addrinfo */
404 if (!fbConnSpecLookupAI(exporter->spec.conn, FALSE, err)) return FALSE;
405 ai = (struct addrinfo *)exporter->spec.conn->vai;
406
407 /* ignore sigpipe if we're doing TCP or SCTP export */
408 if ((exporter->spec.conn->transport == FB_TCP) ||
409 (exporter->spec.conn->transport == FB_TLS_TCP)
410 #if FB_ENABLE_SCTP
411 || (exporter->spec.conn->transport == FB_SCTP)
412 || (exporter->spec.conn->transport == FB_DTLS_SCTP)
413 #endif
414 ) {
415 fbExporterIgnoreSigpipe();
416 }
417
418 /* open socket of appropriate type for connection specifier */
419 do {
420 #if FB_ENABLE_SCTP
421 if ((exporter->spec.conn->transport == FB_SCTP) ||
422 (exporter->spec.conn->transport == FB_DTLS_SCTP))
423 {
424 /* Kludge for SCTP. addrinfo doesn't accept SCTP hints. */
425 ai->ai_socktype = SOCK_STREAM;
426 ai->ai_protocol = IPPROTO_SCTP;
427 }
428 #endif
429
430 exporter->stream.fd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
431 if (exporter->stream.fd < 0) continue;
432 if (connect(exporter->stream.fd, ai->ai_addr, ai->ai_addrlen) == 0) break;
433 close(exporter->stream.fd);
434 } while ((ai = ai->ai_next));
435
436 /* check for no openable socket */
437 if (ai == NULL) {
438 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
439 "couldn't create connected TCP socket to %s:%s %s",
440 exporter->spec.conn->host, exporter->spec.conn->svc,
441 strerror(errno));
442 return FALSE;
443 }
444
445 /* increase send buffer size for UDP */
446 if ((exporter->spec.conn->transport == FB_UDP) ||
447 (exporter->spec.conn->transport == FB_DTLS_UDP))
448 {
449 if (!fbExporterMaxSendbuf(exporter->stream.fd, &sockbuf_sz)) {
450 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
451 "couldn't set socket buffer size on %s: %s",
452 exporter->spec.conn->host, strerror(errno));
453 close(exporter->stream.fd);
454 return FALSE;
455 }
456 }
457
458 /* set active flag */
459 exporter->active = TRUE;
460
461 return TRUE;
462 }
463
464 #if FB_ENABLE_SCTP
465
466 /**
467 *fbExporterWriteSCTP
468 *
469 *
470 * @param exporter
471 * @param msgbase
472 * @param msglen
473 * @param err
474 *
475 * @return
476 */
fbExporterWriteSCTP(fbExporter_t * exporter,uint8_t * msgbase,size_t msglen,GError ** err)477 static gboolean fbExporterWriteSCTP(
478 fbExporter_t *exporter,
479 uint8_t *msgbase,
480 size_t msglen,
481 GError **err)
482 {
483 ssize_t rc;
484 uint16_t initial_setid;
485 gboolean is_template;
486 int sctp_flags = 0;
487 uint32_t sctp_ttl = 0;
488
489 /* Check to see if this is a template message */
490 initial_setid = *(uint16_t *)(msgbase + 16);
491 if (initial_setid == FB_TID_TS || initial_setid == FB_TID_OTS) {
492 is_template = TRUE;
493 } else {
494 is_template = FALSE;
495 }
496
497 /* Do automatic stream selection if requested. */
498 if (exporter->sctp_mode & FB_F_SCTP_AUTOSTREAM) {
499 if (is_template) {
500 exporter->sctp_stream = 0;
501 } else {
502 exporter->sctp_stream = 1;
503 }
504 }
505
506 /* Use partial reliability if requested for non-template messages */
507 if (!is_template && (exporter->sctp_mode & FB_F_SCTP_PR_TTL)) {
508 sctp_flags |= FB_F_SCTP_PR_TTL;
509 sctp_ttl = exporter->sctp_pr_param;
510 }
511
512 rc = sctp_sendmsg(exporter->stream.fd, msgbase, msglen,
513 NULL, 0, /* destination sockaddr */
514 0, /* payload protocol */
515 sctp_flags, /* flags */
516 exporter->sctp_stream, /* stream */
517 sctp_ttl, /* message lifetime (ms) */
518 0); /* context */
519
520 if (rc == (ssize_t)msglen) {
521 return TRUE;
522 } else if (rc == -1) {
523 if (errno == EPIPE) {
524 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_NLWRITE,
525 "Connection reset (EPIPE) on SCTP write");
526 } else {
527 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
528 "I/O error: %s", strerror(errno));
529 }
530 return FALSE;
531 } else {
532 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
533 "short write: wrote %d while writing %lu",
534 (int)rc, msglen);
535 return FALSE;
536 }
537 }
538
539 #endif /* FB_ENABLE_SCTP */
540
541
542 /**
543 *fbExporterWriteTCP
544 *
545 *
546 * @param exporter
547 * @param msgbase
548 * @param msglen
549 * @param err
550 *
551 * @return
552 */
fbExporterWriteTCP(fbExporter_t * exporter,uint8_t * msgbase,size_t msglen,GError ** err)553 static gboolean fbExporterWriteTCP(
554 fbExporter_t *exporter,
555 uint8_t *msgbase,
556 size_t msglen,
557 GError **err)
558 {
559 ssize_t rc;
560
561 rc = write(exporter->stream.fd, msgbase, msglen);
562 if (rc == (ssize_t)msglen) {
563 return TRUE;
564 } else if (rc == -1) {
565 if (errno == EPIPE) {
566 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_NLWRITE,
567 "Connection reset (EPIPE) on TCP write");
568 } else {
569 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
570 "I/O error: %s", strerror(errno));
571 }
572 return FALSE;
573 } else {
574 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
575 "short write: wrote %d while writing %u",
576 (int)rc, (uint32_t)msglen);
577 return FALSE;
578 }
579 }
580
581 /**
582 * fbExporterWriteUDP
583 *
584 *
585 * @param exporter
586 * @param msgbase
587 * @param msglen
588 * @param err
589 *
590 * @return
591 */
fbExporterWriteUDP(fbExporter_t * exporter,uint8_t * msgbase,size_t msglen,GError ** err)592 static gboolean fbExporterWriteUDP(
593 fbExporter_t *exporter,
594 uint8_t *msgbase,
595 size_t msglen,
596 GError **err)
597 {
598 static gboolean sendGood = TRUE;
599 ssize_t rc;
600
601 /* Send the buffer as a single message */
602 rc = send(exporter->stream.fd, msgbase, msglen, 0);
603
604 /* Deal with the results */
605 if (rc == (ssize_t)msglen) {
606 return TRUE;
607 } else if (rc == -1) {
608 if (TRUE == sendGood) {
609 g_warning( "I/O error on UDP send: %s (socket closed on receiver?)",
610 strerror(errno));
611 g_warning("packets will be lost");
612 send(exporter->stream.fd, msgbase, msglen, 0);
613 sendGood = FALSE;
614 return TRUE;
615 }
616 return TRUE;
617 } else {
618 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
619 "Short write on UDP send: wrote %d while writing %u",
620 (int)rc, (uint32_t)msglen);
621 return FALSE;
622 }
623 }
624
625 /**
626 *fbExporterCloseSocket
627 *
628 *
629 * @param exporter
630 *
631 */
fbExporterCloseSocket(fbExporter_t * exporter)632 static void fbExporterCloseSocket(
633 fbExporter_t *exporter)
634 {
635 close(exporter->stream.fd);
636 exporter->active = FALSE;
637 }
638
639 #if HAVE_OPENSSL
640
641 /**
642 *fbExporterOpenTLS
643 *
644 *
645 * @param exporter
646 * @param err
647 *
648 * @return
649 */
fbExporterOpenTLS(fbExporter_t * exporter,GError ** err)650 static gboolean fbExporterOpenTLS(
651 fbExporter_t *exporter,
652 GError **err)
653 {
654 BIO *conn = NULL;
655 gboolean ok = TRUE;
656
657 /* Initialize SSL context if necessary */
658 if (!exporter->spec.conn->vssl_ctx) {
659 if (!fbConnSpecInitTLS(exporter->spec.conn, FALSE, err)) {
660 return FALSE;
661 }
662 }
663
664 /* open underlying socket */
665 if (!fbExporterOpenSocket(exporter, err)) return FALSE;
666
667 /* wrap a stream BIO around the opened socket */
668 if (!(conn = BIO_new_socket(exporter->stream.fd, 1))) {
669 ok = FALSE;
670 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
671 "couldn't wrap socket to %s:%s for TLS: %s",
672 exporter->spec.conn->host, exporter->spec.conn->svc,
673 ERR_error_string(ERR_get_error(), NULL));
674 while (ERR_get_error());
675 goto end;
676 }
677
678 /* create SSL socket */
679 if (!(exporter->ssl = SSL_new((SSL_CTX *)exporter->spec.conn->vssl_ctx))) {
680 ok = FALSE;
681 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
682 "couldnt create TLS socket: %s",
683 ERR_error_string(ERR_get_error(), NULL));
684 while (ERR_get_error());
685 goto end;
686 }
687
688 /* connect to it */
689 SSL_set_bio(exporter->ssl, conn, conn);
690 if (SSL_connect(exporter->ssl) <= 0) {
691 ok = FALSE;
692 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
693 "couldn't connect TLS socket to %s:%s: %s",
694 exporter->spec.conn->host, exporter->spec.conn->svc,
695 ERR_error_string(ERR_get_error(), NULL));
696 while (ERR_get_error());
697 goto end;
698 }
699
700 /* FIXME do post-connection verification */
701
702 end:
703 if (!ok) {
704 exporter->active = FALSE;
705 if (exporter->ssl) {
706 SSL_free(exporter->ssl);
707 exporter->ssl = NULL;
708 } else if (conn) {
709 BIO_vfree(conn);
710 }
711 }
712 return ok;
713 }
714
715 #if HAVE_OPENSSL_DTLS
716
717 /**
718 *fbExporterOpenDTLS
719 *
720 * @param exporter
721 * @param err
722 *
723 * @return
724 *
725 */
fbExporterOpenDTLS(fbExporter_t * exporter,GError ** err)726 static gboolean fbExporterOpenDTLS(
727 fbExporter_t *exporter,
728 GError **err)
729 {
730 BIO *conn = NULL;
731 gboolean ok = TRUE;
732 struct sockaddr peer;
733 size_t peerlen = sizeof(struct sockaddr);
734
735 /* Initialize SSL context if necessary */
736 if (!exporter->spec.conn->vssl_ctx) {
737 if (!fbConnSpecInitTLS(exporter->spec.conn, FALSE, err)) {
738 return FALSE;
739 }
740 }
741
742 /* open underlying socket */
743 if (!fbExporterOpenSocket(exporter, err)) return FALSE;
744
745 /* wrap a datagram BIO around the opened socket */
746 if (!(conn = BIO_new_dgram(exporter->stream.fd, 1))) {
747 ok = FALSE;
748 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
749 "couldn't wrap socket to %s:%s for DTLS: %s",
750 exporter->spec.conn->host, exporter->spec.conn->svc,
751 ERR_error_string(ERR_get_error(), NULL));
752 while (ERR_get_error());
753 goto end;
754 }
755
756 /* Tell dgram bio what its name is */
757 if (getsockname(exporter->stream.fd, &peer, (socklen_t *)&peerlen) < 0) {
758 ok = FALSE;
759 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
760 "couldn't wrap socket to %s:%s for DTLS: %s",
761 exporter->spec.conn->host, exporter->spec.conn->svc,
762 strerror(errno));
763 goto end;
764 }
765 BIO_ctrl_set_connected(conn, 1, &peer);
766
767 /* create SSL socket */
768 if (!(exporter->ssl = SSL_new((SSL_CTX *)exporter->spec.conn->vssl_ctx))) {
769 ok = FALSE;
770 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
771 "couldnt create DTLS socket: %s",
772 ERR_error_string(ERR_get_error(), NULL));
773 while (ERR_get_error());
774 goto end;
775 }
776
777 /* connect to it */
778 SSL_set_bio(exporter->ssl, conn, conn);
779 SSL_set_connect_state(exporter->ssl);
780
781 /* FIXME do post-connection verification */
782
783 end:
784 if (!ok) {
785 exporter->active = FALSE;
786 if (exporter->ssl) {
787 SSL_free(exporter->ssl);
788 exporter->ssl = NULL;
789 } else if (conn) {
790 BIO_vfree(conn);
791 }
792 }
793 return ok;
794 }
795
796 #endif /* HAVE_OPENSSL_DTLS */
797
798 /**
799 *fbExporterWriteTLS
800 *
801 *
802 * @param exporter
803 * @param msgbase
804 * @param msglen
805 * @param err
806 *
807 * @return
808 */
fbExporterWriteTLS(fbExporter_t * exporter,uint8_t * msgbase,size_t msglen,GError ** err)809 static gboolean fbExporterWriteTLS(
810 fbExporter_t *exporter,
811 uint8_t *msgbase,
812 size_t msglen,
813 GError **err)
814 {
815 int rc;
816
817 while (msglen) {
818 rc = SSL_write(exporter->ssl, msgbase, msglen);
819 if (rc <= 0) {
820 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_IO,
821 "I/O error: %s",
822 ERR_error_string(ERR_get_error(), NULL));
823 while (ERR_get_error());
824 return FALSE;
825 }
826
827 /* we sent some bytes - advance pointers */
828 msglen -= rc;
829 msgbase += rc;
830 }
831
832 return TRUE;
833 }
834
835 /**
836 *fbExporterCloseTLS
837 *
838 *
839 * @param exporter
840 *
841 */
fbExporterCloseTLS(fbExporter_t * exporter)842 static void fbExporterCloseTLS(
843 fbExporter_t *exporter)
844 {
845 SSL_shutdown(exporter->ssl);
846 SSL_free(exporter->ssl);
847 exporter->active = FALSE;
848 }
849
850 #endif /* HAVE_OPENSSL */
851
852 /**
853 *fbExporterGetMTU
854 *
855 * @param exporter
856 *
857 * @return
858 */
fbExporterGetMTU(fbExporter_t * exporter)859 uint16_t fbExporterGetMTU(
860 fbExporter_t *exporter)
861 {
862 return exporter->mtu;
863 }
864
865 /**
866 *fbExporterAllocNet
867 *
868 * @param spec
869 *
870 * @return
871 */
fbExporterAllocNet(fbConnSpec_t * spec)872 fbExporter_t *fbExporterAllocNet(
873 fbConnSpec_t *spec)
874 {
875 fbExporter_t *exporter = NULL;
876
877 /* Host must not be null */
878 g_assert(spec->host);
879
880 /* Create a new exporter */
881 exporter = g_slice_new0(fbExporter_t);
882
883 /* Copy the connection specifier */
884 exporter->spec.conn = fbConnSpecCopy(spec);
885
886 /* Set up functions */
887 switch (spec->transport) {
888 #if FB_ENABLE_SCTP
889 case FB_SCTP:
890 exporter->exopen = fbExporterOpenSocket;
891 exporter->exwrite = fbExporterWriteSCTP;
892 exporter->exclose = fbExporterCloseSocket;
893 exporter->sctp_mode = FB_F_SCTP_AUTOSTREAM;
894 exporter->sctp_stream = 0;
895 exporter->mtu = 8192;
896 break;
897 #endif
898 case FB_TCP:
899 exporter->exopen = fbExporterOpenSocket;
900 exporter->exwrite = fbExporterWriteTCP;
901 exporter->exclose = fbExporterCloseSocket;
902 exporter->mtu = 8192;
903 break;
904 case FB_UDP:
905 exporter->exopen = fbExporterOpenSocket;
906 exporter->exwrite = fbExporterWriteUDP;
907 exporter->exclose = fbExporterCloseSocket;
908 exporter->mtu = 1420;
909 break;
910 #if HAVE_OPENSSL
911 #if HAVE_OPENSSL_DTLS_SCTP
912 case FB_DTLS_SCTP:
913 exporter->exopen = fbExporterOpenDTLS;
914 exporter->exwrite = fbExporterWriteTLS;
915 exporter->exclose = fbExporterCloseTLS;
916 exporter->sctp_mode = FB_F_SCTP_AUTOSTREAM;
917 exporter->sctp_stream = 0;
918 exporter->mtu = 8192;
919 break;
920 #endif
921 case FB_TLS_TCP:
922 exporter->exopen = fbExporterOpenTLS;
923 exporter->exwrite = fbExporterWriteTLS;
924 exporter->exclose = fbExporterCloseTLS;
925 exporter->mtu = 8192;
926 break;
927 #if HAVE_OPENSSL_DTLS
928 case FB_DTLS_UDP:
929 exporter->exopen = fbExporterOpenDTLS;
930 exporter->exwrite = fbExporterWriteTLS;
931 exporter->exclose = fbExporterCloseTLS;
932 exporter->mtu = 1320;
933 break;
934 #endif
935 #endif
936 default:
937 #ifndef FB_ENABLE_SCTP
938 if (spec->transport == FB_SCTP || spec->transport == FB_DTLS_SCTP) {
939 g_error("Libfixbuf not enabled for SCTP Transport. "
940 " Run configure with --with-sctp");
941 }
942 #endif
943 if (spec->transport == FB_TLS_TCP || spec->transport == FB_DTLS_SCTP ||
944 spec->transport == FB_DTLS_UDP) {
945 g_error("Libfixbuf not enabled for this mode of transport. "
946 " Run configure with --with-openssl");
947 }
948 }
949
950 /* Return new exporter */
951 return exporter;
952 }
953
954 #if HAVE_SPREAD
955
956 /**
957 * fbExporterSpreadReceiver
958 *
959 */
960
fbExporterSpreadReceiver(void * arg)961 static void * fbExporterSpreadReceiver(
962 void *arg)
963 {
964 int i = 0;
965 char grp[MAX_GROUP_NAME];
966 fbExporter_t *exporter = (fbExporter_t *)arg;
967 fbSpreadSpec_t *spread = exporter->spec.spread;
968 service service_type = 0;
969 char sender[MAX_GROUP_NAME];
970 int num_groups = 0;
971 int16 mess_type = 0;
972 int endian_mismatch;
973 int run = 1;
974 int ret;
975 membership_info memb_info;
976
977 #if 0
978 /* this loop is to allow a debugger to be attached */
979 int foo = 1;
980 do {
981 sleep(1);
982 } while (foo);
983 #endif
984
985 ret = SP_connect(spread->daemon, 0, 0, 1,
986 &(spread->recv_mbox), spread->recv_privgroup);
987
988 if (ret != ACCEPT_SESSION) {
989
990 g_set_error(&(spread->recv_err), FB_ERROR_DOMAIN, FB_ERROR_CONN,
991 "error connecting to Spread daemon %s: %s",
992 spread->daemon, fbConnSpreadError(ret));
993 return 0;
994 }
995
996 for (i = 0; i < spread->num_groups; i++) {
997 /* exporter listener only joins template/control plane group,
998 the group name is always the name of the data plane group
999 plus 'T' added to the end. */
1000 memset(grp, 0, sizeof(grp));
1001 strncpy(grp, spread->groups[i].name, sizeof(grp) - 2);
1002 strcat(grp, "T");
1003 ret = SP_join(spread->recv_mbox, grp);
1004 if (ret) {
1005 g_set_error(&(spread->recv_err), FB_ERROR_DOMAIN, FB_ERROR_CONN,
1006 "error joining to Spread group %s: %s",
1007 spread->groups[i].name, fbConnSpreadError(ret));
1008 return 0;
1009 }
1010 }
1011
1012 do {
1013 ret = SP_receive(spread->recv_mbox, &service_type, sender,
1014 spread->recv_max_groups, &num_groups,
1015 (char (*)[])spread->recv_groups,
1016 &mess_type, &endian_mismatch, spread->recv_max,
1017 spread->recv_mess);
1018
1019 if (spread->recv_exit) {
1020 SP_disconnect(spread->recv_mbox);
1021 continue;
1022 }
1023
1024 if (ret < 0) {
1025
1026 if (ret == GROUPS_TOO_SHORT) {
1027 g_free(spread->recv_groups);
1028 spread->recv_max_groups = -ret;
1029 spread->recv_groups = g_new0(sp_groupname_t,
1030 spread->recv_max_groups);
1031 } else if (ret == BUFFER_TOO_SHORT) {
1032 g_free(spread->recv_mess);
1033 spread->recv_max = -endian_mismatch;
1034 spread->recv_mess = g_new0(char, spread->recv_max);
1035 } else {
1036 g_set_error(&(spread->recv_err), FB_ERROR_DOMAIN, FB_ERROR_IO,
1037 "error receiving Spread message: %s",
1038 fbConnSpreadError(ret));
1039
1040 SP_disconnect(spread->recv_mbox);
1041
1042 run = 0;
1043 }
1044 continue;
1045 }
1046 /* actually received a message! Now process it */
1047 if (!Is_reg_memb_mess(service_type) ||
1048 !Is_caused_join_mess(service_type))
1049 continue;
1050
1051 if (SP_get_memb_info(spread->recv_mess, service_type, &memb_info) < 0)
1052 continue;
1053 if (!memb_info.changed_member[0])
1054 continue;
1055 if (strncmp(memb_info.changed_member, spread->recv_privgroup,
1056 MAX_GROUP_NAME) == 0)
1057 {
1058 continue;
1059 }
1060
1061 /** Send Relevant Templates to New Member only. */
1062 /** memb_info.changed_member is private group name */
1063 /** sender is group they are subscribing to */
1064 fbSessionSetPrivateGroup(spread->session,
1065 sender, memb_info.changed_member);
1066
1067 } while (run);
1068
1069 return 0;
1070 }
1071
1072 /**
1073 * fbExporterSpreadOpen
1074 *
1075 * @param exporter
1076 * @param err
1077 */
fbExporterSpreadOpen(fbExporter_t * exporter,GError ** err)1078 static gboolean fbExporterSpreadOpen(
1079 fbExporter_t *exporter,
1080 GError **err )
1081 {
1082 int ret;
1083
1084 if (!exporter->spec.spread->daemon) {
1085 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1086 "Spread daemon name cannot be null");
1087 return FALSE;
1088 }
1089 if (!exporter->spec.spread->daemon[0]) {
1090 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1091 "Spread daemon name cannot be empty");
1092 return FALSE;
1093 }
1094
1095 if (!(memchr(exporter->spec.spread->daemon, 0, 261))) {
1096 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1097 "Spread daemon name too long");
1098 return FALSE;
1099 }
1100 if (!exporter->spec.spread->groups) {
1101 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1102 "Spread groups cannot be null");
1103 return FALSE;
1104 }
1105 if (!exporter->spec.spread->groups[0].name[0]) {
1106 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1107 "Spread groups cannot be empty");
1108 return FALSE;
1109 }
1110 if (!exporter->spec.spread->session) {
1111 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1112 "Spread session cannot be null");
1113 return FALSE;
1114 }
1115
1116 pthread_mutex_init(&(exporter->spec.spread->write_lock), 0);
1117
1118 ret = SP_connect(exporter->spec.spread->daemon, 0, 0, 0,
1119 &(exporter->spec.spread->mbox),
1120 exporter->spec.spread->privgroup);
1121 if (ret != ACCEPT_SESSION) {
1122 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1123 "error connecting to Spread daemon %s: %s",
1124 exporter->spec.spread->daemon, fbConnSpreadError(ret));
1125 return FALSE;
1126 }
1127
1128 exporter->spec.spread->recv_err = 0;
1129 exporter->spec.spread->recv_exit = 0;
1130 ret = pthread_create(&(exporter->spec.spread->recv_thread), NULL,
1131 fbExporterSpreadReceiver, exporter );
1132 if (ret) {
1133 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1134 "error creating Spread receiver thread: %s",strerror(ret));
1135 SP_disconnect(exporter->spec.spread->mbox);
1136 return FALSE;
1137 }
1138
1139 /*pthread_detach(exporter->spec.spread->recv_thread);*/
1140
1141 exporter->active = TRUE;
1142 return TRUE;
1143 }
1144
1145 /**
1146 * fbExporterSpreadWrite
1147 *
1148 * @param exporter
1149 * @param msg
1150 * @param msg size
1151 * @param err
1152 */
fbExporterSpreadWrite(fbExporter_t * exporter,uint8_t * msgbase,size_t msglen,GError ** err)1153 static gboolean fbExporterSpreadWrite(
1154 fbExporter_t *exporter,
1155 uint8_t *msgbase,
1156 size_t msglen,
1157 GError **err)
1158 {
1159 int ret = 0;
1160 fbSpreadSpec_t *spread;
1161
1162 pthread_mutex_lock(&(exporter->spec.spread->write_lock));
1163
1164 spread = exporter->spec.spread;
1165
1166 if (spread->num_groups_to_send == 1) {
1167 ret = SP_multicast(spread->mbox, RELIABLE_MESS,
1168 spread->groups_to_send[0].name,
1169 0, msglen, (const char *)msgbase);
1170 } else if (spread->num_groups == 1) {
1171 ret = SP_multicast(spread->mbox, RELIABLE_MESS,
1172 spread->groups[0].name,
1173 0, msglen, (const char *)msgbase);
1174 } else if (spread->num_groups_to_send > 1) {
1175 ret = SP_multigroup_multicast(spread->mbox, RELIABLE_MESS,
1176 spread->num_groups_to_send,
1177 (const char (*)[])spread->groups_to_send,
1178 0, msglen, (const char *)msgbase);
1179
1180 } else {
1181 ret = SP_multigroup_multicast(spread->mbox, RELIABLE_MESS,
1182 spread->num_groups,
1183 (const char (*)[])spread->groups,
1184 0, msglen, (const char *)msgbase);
1185 }
1186
1187 if (ret < 0) {
1188 g_set_error(err, FB_ERROR_DOMAIN, FB_ERROR_CONN,
1189 "error receiving Spread message: %s",
1190 fbConnSpreadError(ret));
1191 }
1192
1193 pthread_mutex_unlock(&(exporter->spec.spread->write_lock));
1194
1195 return (ret < 0) ? FALSE : TRUE;
1196 }
1197
1198 /**
1199 * fbExporterSpreadClose
1200 *
1201 * @param exporter
1202 */
1203
fbExporterSpreadClose(fbExporter_t * exporter)1204 static void fbExporterSpreadClose(
1205 fbExporter_t *exporter)
1206 {
1207 if (exporter->active) {
1208 pthread_cancel(exporter->spec.spread->recv_thread);
1209 pthread_join(exporter->spec.spread->recv_thread, NULL);
1210 SP_disconnect(exporter->spec.spread->mbox);
1211 }
1212 exporter->active = FALSE;
1213 }
1214
1215 /**
1216 * fbExporterAllocSpread
1217 *
1218 * @param Spread_params
1219 */
fbExporterAllocSpread(fbSpreadParams_t * params)1220 fbExporter_t *fbExporterAllocSpread(
1221 fbSpreadParams_t *params)
1222 {
1223 fbExporter_t *exporter = NULL;
1224
1225 g_assert(params->daemon);
1226 g_assert(params->groups);
1227 g_assert(params->groups[0]);
1228
1229 exporter = g_slice_new0(fbExporter_t);
1230
1231 exporter->spec.spread = fbConnSpreadCopy(params);
1232
1233 exporter->exopen = fbExporterSpreadOpen;
1234 exporter->exwrite = fbExporterSpreadWrite;
1235 exporter->exclose = fbExporterSpreadClose;
1236 #ifdef DEBUG
1237 exporter->mtu = 8192;
1238 #else
1239 exporter->mtu = FB_SPREAD_MTU;
1240 #endif
1241
1242 return exporter;
1243 }
1244
1245 /**
1246 * fbExporterSetGroupsToSend
1247 *
1248 * @param exporter
1249 * @param groups
1250 * @param num_groups
1251 *
1252 */
fbExporterSetGroupsToSend(fbExporter_t * exporter,char ** groups,int num_groups)1253 void fbExporterSetGroupsToSend(
1254 fbExporter_t *exporter,
1255 char **groups,
1256 int num_groups)
1257 {
1258 int n = 0;
1259 char **g = 0;
1260 fbSpreadSpec_t *spread = exporter->spec.spread;
1261
1262 if (!spread->groups_to_send) {
1263 spread->groups_to_send = g_new0(sp_groupname_t, spread->num_groups);
1264 }
1265
1266 g = groups;
1267
1268 for (n=0; n < num_groups; n++) {
1269 strncpy(spread->groups_to_send[n].name, *g, MAX_GROUP_NAME-1);
1270 g++;
1271 }
1272
1273 spread->num_groups_to_send = n;
1274 }
1275
1276 /**
1277 * fbExporterCheckGroups
1278 *
1279 * If we are sending to a new (different) group of Spread Groups - return TRUE
1280 * to emit the buffer and set new export groups.
1281 *
1282 * @param exporter
1283 * @param groups
1284 * @param num_groups
1285 */
fbExporterCheckGroups(fbExporter_t * exporter,char ** groups,int num_groups)1286 gboolean fbExporterCheckGroups(
1287 fbExporter_t *exporter,
1288 char **groups,
1289 int num_groups)
1290 {
1291 int n;
1292 fbSpreadSpec_t *spread = exporter->spec.spread;
1293
1294 if (num_groups != spread->num_groups_to_send) {
1295 return TRUE;
1296 }
1297
1298 for (n = 0; n < num_groups; n++) {
1299 if (strcmp(spread->groups_to_send[n].name, groups[n]) != 0) {
1300 return TRUE;
1301 }
1302 }
1303
1304 return FALSE;
1305
1306 }
1307
1308 #endif /* HAVE_SPREAD */
1309
1310 /**
1311 *fbExporterSetStream
1312 *
1313 * @param exporter
1314 * @param sctp_stream
1315 *
1316 */
fbExporterSetStream(fbExporter_t * exporter,int sctp_stream)1317 void fbExporterSetStream(
1318 fbExporter_t *exporter,
1319 int sctp_stream)
1320 {
1321 exporter->sctp_mode &= ~FB_F_SCTP_AUTOSTREAM;
1322 exporter->sctp_stream = sctp_stream;
1323 }
1324
1325 /**
1326 *fbExporterAutoStream
1327 *
1328 * @param exporter
1329 *
1330 *
1331 */
fbExporterAutoStream(fbExporter_t * exporter)1332 void fbExporterAutoStream(
1333 fbExporter_t *exporter)
1334 {
1335 exporter->sctp_mode |= FB_F_SCTP_AUTOSTREAM;
1336 }
1337
1338 #if 0
1339 /**
1340 *fbExporterSetPRTTL
1341 *
1342 * @param exporter
1343 * @param pr_ttl
1344 *
1345 *
1346 */
1347 void fbExporterSetPRTTL(
1348 fbExporter_t *exporter,
1349 int pr_ttl)
1350 {
1351 if (pr_ttl > 0) {
1352 exporter->sctp_mode |= FB_F_SCTP_PR_TTL;
1353 exporter->sctp_pr_param = pr_ttl;
1354 } else {
1355 exporter->sctp_mode &= ~FB_F_SCTP_PR_TTL;
1356 exporter->sctp_pr_param = 0;
1357 }
1358 }
1359 #endif /* 0 */
1360
1361 /**
1362 *fbExportMessage
1363 *
1364 *
1365 * @param exporter
1366 * @param msgbase
1367 * @param msglen
1368 * @param err
1369 *
1370 * @return
1371 *
1372 */
fbExportMessage(fbExporter_t * exporter,uint8_t * msgbase,size_t msglen,GError ** err)1373 gboolean fbExportMessage(
1374 fbExporter_t *exporter,
1375 uint8_t *msgbase,
1376 size_t msglen,
1377 GError **err)
1378 {
1379 /* Ensure stream is open */
1380 if (!exporter->active) {
1381 g_assert(exporter->exopen);
1382 if (!exporter->exopen(exporter, err)) return FALSE;
1383 }
1384
1385 /* Attempt to write message */
1386 if (exporter->exwrite(exporter, msgbase, msglen, err)) return TRUE;
1387
1388 /* Close exporter on write failure */
1389 if (exporter->exclose) exporter->exclose(exporter);
1390 return FALSE;
1391 }
1392
1393 /**
1394 *fbExporterFree
1395 *
1396 *
1397 * @param exporter
1398 *
1399 */
fbExporterFree(fbExporter_t * exporter)1400 void fbExporterFree(
1401 fbExporter_t *exporter)
1402 {
1403 fbExporterClose(exporter);
1404 if (exporter->exwrite == fbExporterWriteFile)
1405 {
1406 g_free(exporter->spec.path);
1407 }
1408 #ifdef HAVE_SPREAD
1409 else if (exporter->exwrite == fbExporterSpreadWrite)
1410 {
1411 fbConnSpreadFree( exporter->spec.spread );
1412 }
1413 #endif
1414 else {
1415 fbConnSpecFree(exporter->spec.conn);
1416 }
1417
1418 g_slice_free(fbExporter_t, exporter);
1419 }
1420
1421 /**
1422 *fbExporterClose
1423 *
1424 *
1425 *
1426 * @param exporter
1427 */
fbExporterClose(fbExporter_t * exporter)1428 void fbExporterClose(
1429 fbExporter_t *exporter)
1430 {
1431 if (exporter->active && exporter->exclose) exporter->exclose(exporter);
1432 }
1433
1434 /**
1435 * fbExporterGetMsgLen
1436 *
1437 *
1438 * @param exporter
1439 */
1440
fbExporterGetMsgLen(fbExporter_t * exporter)1441 size_t fbExporterGetMsgLen(
1442 fbExporter_t *exporter)
1443 {
1444 return exporter->msg_len;
1445 }
1446